Skip to content

Commit 1de3ee6

Browse files
khushijain21VihasMakwanaandrzej-stencel
authored
[fileconsumer] Compute fingerprint for compressed files by decompressing its data (open-telemetry#40256)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description This PR changes how fingerprint for compressed files is computed. It now decompresses first 'N' bytes to compute the fingerprint. This ensures that data will not be re-ingested when compressed and uncompressed files exist with the same content. One such case is when log files are rotated and compressed. <!-- Issue number (e.g. open-telemetry#1234) or full URL to issue, if applicable. --> #### Link to tracking issue Fixes open-telemetry#37772 <!--Describe what testing was performed and which tests were added.--> #### Testing Have `test.log` and `test.log.gz` in your path. If the content of the compressed file is the same as that of the uncompressed file, the logs will only be ingested once. Otherwise, we have two readers - one for plain text and the other for gzipped file. Start otelcollector-contrib with following config file ``` receivers: filelog: include: [ "./test.log*" ] # Path to log file start_at: beginning # Read from the start of the file include_file_path: true include_file_name: true compression: auto exporters: debug/1: verbosity: detailed service: telemetry: logs: level: debug pipelines: logs: receivers: [filelog] exporters: [debug/1] ``` you can see only one set of logs are emitted --------- Co-authored-by: Vihas Makwana <[email protected]> Co-authored-by: Andrzej Stencel <[email protected]>
1 parent ec280e5 commit 1de3ee6

File tree

6 files changed

+118
-10
lines changed

6 files changed

+118
-10
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: filelogreceiver
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: The fingerprint of gzip compressed files is created by decompressing and reading the first `fingerprint_size` bytes.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [37772]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: This feature can be enabled via the following feature gate `--feature-gates=filelog.decompressFingerprint`. This can cause existing gzip files to be re-ingested because of changes in how fingerprints are computed.
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

pkg/stanza/fileconsumer/internal/fingerprint/fingerprint.go

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,29 @@ package fingerprint // import "github.com/open-telemetry/opentelemetry-collector
55

66
import (
77
"bytes"
8+
"compress/gzip"
89
"encoding/json"
910
"errors"
1011
"fmt"
1112
"io"
1213
"os"
14+
"path/filepath"
15+
16+
"go.opentelemetry.io/collector/featuregate"
1317
)
1418

1519
const DefaultSize = 1000 // bytes
1620

1721
const MinSize = 16 // bytes
1822

23+
var DecompressedFingerprintFeatureGate = featuregate.GlobalRegistry().MustRegister(
24+
"filelog.decompressFingerprint",
25+
featuregate.StageAlpha,
26+
featuregate.WithRegisterDescription("Computes fingerprint for compressed files by decompressing its data"),
27+
featuregate.WithRegisterFromVersion("v0.128.0"),
28+
featuregate.WithRegisterReferenceURL("https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/40256"),
29+
)
30+
1931
// Fingerprint is used to identify a file
2032
// A file's fingerprint is the first N bytes of the file
2133
type Fingerprint struct {
@@ -26,15 +38,40 @@ func New(first []byte) *Fingerprint {
2638
return &Fingerprint{firstBytes: first}
2739
}
2840

29-
func NewFromFile(file *os.File, size int) (*Fingerprint, error) {
41+
// NewFromFile computes fingerprint of the given file using first 'N' bytes
42+
// Set decompressData to true to compute fingerprint of compressed files by decompressing its data first
43+
func NewFromFile(file *os.File, size int, decompressData bool) (*Fingerprint, error) {
3044
buf := make([]byte, size)
45+
if DecompressedFingerprintFeatureGate.IsEnabled() {
46+
if decompressData {
47+
if hasGzipExtension(file.Name()) {
48+
// If the file is of compressed type, uncompress the data before creating its fingerprint
49+
uncompressedData, err := gzip.NewReader(file)
50+
if err != nil {
51+
return nil, fmt.Errorf("error uncompressing gzip file: %w", err)
52+
}
53+
defer uncompressedData.Close()
54+
55+
n, err := uncompressedData.Read(buf)
56+
if err != nil && !errors.Is(err, io.EOF) {
57+
return nil, fmt.Errorf("error reading fingerprint bytes: %w", err)
58+
}
59+
return New(buf[:n]), nil
60+
}
61+
}
62+
}
63+
3164
n, err := file.ReadAt(buf, 0)
3265
if err != nil && !errors.Is(err, io.EOF) {
3366
return nil, fmt.Errorf("reading fingerprint bytes: %w", err)
3467
}
3568
return New(buf[:n]), nil
3669
}
3770

71+
func hasGzipExtension(filename string) bool {
72+
return filepath.Ext(filename) == ".gz"
73+
}
74+
3875
// Copy creates a new copy of the fingerprint
3976
func (f Fingerprint) Copy() *Fingerprint {
4077
buf := make([]byte, len(f.firstBytes), cap(f.firstBytes))

pkg/stanza/fileconsumer/internal/fingerprint/fingerprint_test.go

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,18 @@
44
package fingerprint
55

66
import (
7+
"compress/gzip"
78
"encoding/binary"
89
"fmt"
10+
"io"
911
"math/rand/v2"
1012
"os"
1113
"testing"
1214

1315
"github.com/stretchr/testify/require"
16+
"go.opentelemetry.io/collector/featuregate"
17+
18+
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/internal/filetest"
1419
)
1520

1621
func TestNewDoesNotModifyOffset(t *testing.T) {
@@ -37,7 +42,7 @@ func TestNewDoesNotModifyOffset(t *testing.T) {
3742
_, err = temp.Seek(0, 0)
3843
require.NoError(t, err)
3944

40-
fp, err := NewFromFile(temp, len(fingerprint))
45+
fp, err := NewFromFile(temp, len(fingerprint), false)
4146
require.NoError(t, err)
4247

4348
// Validate the fingerprint is the correct size
@@ -131,7 +136,7 @@ func TestNewFromFile(t *testing.T) {
131136
require.NoError(t, err)
132137
require.Equal(t, tc.fileSize, int(info.Size()))
133138

134-
fp, err := NewFromFile(temp, tc.fingerprintSize)
139+
fp, err := NewFromFile(temp, tc.fingerprintSize, false)
135140
require.NoError(t, err)
136141

137142
require.Len(t, fp.firstBytes, tc.expectedLen)
@@ -264,7 +269,7 @@ func TestStartsWith_FromFile(t *testing.T) {
264269
_, err = fullFile.Write(content)
265270
require.NoError(t, err)
266271

267-
fff, err := NewFromFile(fullFile, fingerprintSize)
272+
fff, err := NewFromFile(fullFile, fingerprintSize, false)
268273
require.NoError(t, err)
269274

270275
partialFile, err := os.CreateTemp(tempDir, "")
@@ -282,7 +287,7 @@ func TestStartsWith_FromFile(t *testing.T) {
282287
_, err = partialFile.Write(content[i:i])
283288
require.NoError(t, err)
284289

285-
pff, err := NewFromFile(partialFile, fingerprintSize)
290+
pff, err := NewFromFile(partialFile, fingerprintSize, false)
286291
require.NoError(t, err)
287292

288293
require.True(t, fff.StartsWith(pff))
@@ -308,3 +313,29 @@ func TestMarshalUnmarshal(t *testing.T) {
308313

309314
require.Equal(t, fp, fp2)
310315
}
316+
317+
// Test compressed and uncompressed file with same content have equal fingerprint
318+
func TestCompressionFingerprint(t *testing.T) {
319+
require.NoError(t, featuregate.GlobalRegistry().Set(DecompressedFingerprintFeatureGate.ID(), true))
320+
tmp := t.TempDir()
321+
compressedFile := filetest.OpenTempWithPattern(t, tmp, "*.gz")
322+
gzipWriter := gzip.NewWriter(compressedFile)
323+
defer gzipWriter.Close()
324+
325+
data := []byte("this is a first test line")
326+
// Write data
327+
n, err := gzipWriter.Write(data)
328+
require.NoError(t, err)
329+
require.NoError(t, gzipWriter.Close())
330+
require.NotZero(t, n, "gzip file should not be empty")
331+
332+
// set seek to the start of the file
333+
_, err = compressedFile.Seek(0, io.SeekStart)
334+
require.NoError(t, err)
335+
336+
compressedFP, err := NewFromFile(compressedFile, len(data), true)
337+
require.NoError(t, err)
338+
339+
uncompressedFP := New(data)
340+
uncompressedFP.Equal(compressedFP)
341+
}

pkg/stanza/fileconsumer/internal/reader/factory.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ type Factory struct {
5252
}
5353

5454
func (f *Factory) NewFingerprint(file *os.File) (*fingerprint.Fingerprint, error) {
55-
return fingerprint.NewFromFile(file, f.FingerprintSize)
55+
return fingerprint.NewFromFile(file, f.FingerprintSize, f.Compression != "")
5656
}
5757

5858
func (f *Factory) NewReader(file *os.File, fp *fingerprint.Fingerprint) (*Reader, error) {
@@ -98,7 +98,7 @@ func (f *Factory) NewReaderFromMetadata(file *os.File, m *Metadata) (r *Reader,
9898

9999
if r.Fingerprint.Len() > r.fingerprintSize {
100100
// User has reconfigured fingerprint_size
101-
shorter, rereadErr := fingerprint.NewFromFile(file, r.fingerprintSize)
101+
shorter, rereadErr := fingerprint.NewFromFile(file, r.fingerprintSize, r.compression != "")
102102
if rereadErr != nil {
103103
return nil, fmt.Errorf("reread fingerprint: %w", rereadErr)
104104
}

pkg/stanza/fileconsumer/internal/reader/reader.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,7 @@ func (r *Reader) Validate() bool {
320320
if r.file == nil {
321321
return false
322322
}
323-
refreshedFingerprint, err := fingerprint.NewFromFile(r.file, r.fingerprintSize)
323+
refreshedFingerprint, err := fingerprint.NewFromFile(r.file, r.fingerprintSize, r.compression != "")
324324
if err != nil {
325325
return false
326326
}
@@ -343,7 +343,7 @@ func (r *Reader) updateFingerprint() {
343343
if r.file == nil {
344344
return
345345
}
346-
refreshedFingerprint, err := fingerprint.NewFromFile(r.file, r.fingerprintSize)
346+
refreshedFingerprint, err := fingerprint.NewFromFile(r.file, r.fingerprintSize, r.compression != "")
347347
if err != nil {
348348
return
349349
}

receiver/filelogreceiver/README.md

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ Tails and parses logs from files.
6464
| `ordering_criteria.sort_by.location` | | Relevant if `sort_type` is set to `timestamp`. Defines the location of the timestamp of the file. |
6565
| `ordering_criteria.sort_by.format` | | Relevant if `sort_type` is set to `timestamp`. Defines the strptime format of the timestamp being sorted. |
6666
| `ordering_criteria.sort_by.ascending` | | Sort direction |
67-
| `compression` | | Indicate the compression format of input files. If set accordingly, files will be read using a reader that uncompresses the file before scanning its content. Options are ``, `gzip`, or `auto`. `auto` auto-detects file compression type. Currently, gzip files are the only compressed files auto-detected, based on ".gz" filename extension. `auto` option is useful when ingesting a mix of compressed and uncompressed files with the same filelogreceiver. |
67+
| `compression` | | Indicate the compression format of input files. If set accordingly, files will be read using a reader that uncompresses the file before scanning its content. Options are ``, `gzip`, or `auto`. `auto` auto-detects file compression type. Currently, gzip files are the only compressed files auto-detected, based on ".gz" filename extension. `auto` option is useful when ingesting a mix of compressed and uncompressed files with the same filelogreceiver. |
6868

6969
Note that _by default_, no logs will be read from a file that is not actively being written to because `start_at` defaults to `end`.
7070

@@ -232,3 +232,16 @@ Enabling [Collector metrics](https://opentelemetry.io/docs/collector/internal-te
232232
will also provide telemetry metrics for the state of the receiver's file consumption.
233233
Specifically, the `otelcol_fileconsumer_open_files` and `otelcol_fileconsumer_reading_files` metrics
234234
are provided.
235+
236+
## Feature Gates
237+
238+
### `filelog.decompressFingerprint`
239+
240+
When this feature gate is enabled, the fingerprint of compressed file is computed by first decompressing its data. Note, it is important to set `compression` to a non-empty value for it to work.
241+
242+
This can cause existing gzip files to be re-ingested because of changes in how fingerprints are computed.
243+
244+
Schedule for this feature gate is:
245+
246+
- Introduce as `Alpha` (disabled by default) in `v0.128.0`
247+
- Move to `Beta` (enabled by default) in `v0.129.0`

0 commit comments

Comments
 (0)