Skip to content

Commit 88c8dc4

Browse files
perf: improve memory consumption in 2ms file walk (#287)
**Proposed Changes** - Separate secret detection logic for filesystem plugin - Chunking when the processed file is larger than 10 MB (with peeking so as not to lose the context of the secrets); - Weighted semaphore to control the use of memory (with a dynamic budget, i.e., dependent on the host's RAM). **Checklist** - [x] I covered my changes with tests. - [ ] I Updated the documentation that is affected by my changes: - [ ] Change in the CLI arguments - [ ] Change in the configuration file --------- Co-authored-by: cx-leonardo-fontes <[email protected]>
1 parent 5c33218 commit 88c8dc4

File tree

15 files changed

+1424
-124
lines changed

15 files changed

+1424
-124
lines changed

cmd/main.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -131,17 +131,17 @@ func preRun(pluginName string, cmd *cobra.Command, args []string) error {
131131
return err
132132
}
133133

134-
engine, err := engine.Init(engineConfigVar)
134+
engineInstance, err := engine.Init(engineConfigVar)
135135
if err != nil {
136136
return err
137137
}
138138

139-
if err := engine.AddRegexRules(customRegexRuleVar); err != nil {
139+
if err := engineInstance.AddRegexRules(customRegexRuleVar); err != nil {
140140
return err
141141
}
142142

143143
Channels.WaitGroup.Add(1)
144-
go ProcessItems(engine, pluginName)
144+
go ProcessItems(engineInstance, pluginName)
145145

146146
Channels.WaitGroup.Add(1)
147147
go ProcessSecrets()
@@ -151,10 +151,10 @@ func preRun(pluginName string, cmd *cobra.Command, args []string) error {
151151

152152
if validateVar {
153153
Channels.WaitGroup.Add(1)
154-
go ProcessValidationAndScoreWithValidation(engine)
154+
go ProcessValidationAndScoreWithValidation(engineInstance)
155155
} else {
156156
Channels.WaitGroup.Add(1)
157-
go ProcessScoreWithoutValidation(engine)
157+
go ProcessScoreWithoutValidation(engineInstance)
158158
}
159159

160160
return nil

cmd/workers.go

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,37 @@
11
package cmd
22

33
import (
4+
"context"
45
"github.com/checkmarx/2ms/engine"
56
"github.com/checkmarx/2ms/engine/extra"
67
"github.com/checkmarx/2ms/lib/secrets"
8+
"golang.org/x/sync/errgroup"
79
"sync"
810
)
911

10-
func ProcessItems(engine *engine.Engine, pluginName string) {
12+
func ProcessItems(engineInstance engine.IEngine, pluginName string) {
1113
defer Channels.WaitGroup.Done()
12-
wgItems := &sync.WaitGroup{}
14+
15+
g, ctx := errgroup.WithContext(context.Background())
1316
for item := range Channels.Items {
1417
Report.TotalItemsScanned++
15-
wgItems.Add(1)
16-
go engine.Detect(item, SecretsChan, wgItems, pluginName, Channels.Errors)
18+
item := item
19+
20+
switch pluginName {
21+
case "filesystem":
22+
g.Go(func() error {
23+
return engineInstance.DetectFile(ctx, item, SecretsChan)
24+
})
25+
default:
26+
g.Go(func() error {
27+
return engineInstance.DetectFragment(item, SecretsChan, pluginName)
28+
})
29+
}
30+
}
31+
32+
if err := g.Wait(); err != nil {
33+
Channels.Errors <- err
1734
}
18-
wgItems.Wait()
1935
close(SecretsChan)
2036
}
2137

@@ -48,7 +64,7 @@ func ProcessSecretsExtras() {
4864
wgExtras.Wait()
4965
}
5066

51-
func ProcessValidationAndScoreWithValidation(engine *engine.Engine) {
67+
func ProcessValidationAndScoreWithValidation(engine engine.IEngine) {
5268
defer Channels.WaitGroup.Done()
5369

5470
wgValidation := &sync.WaitGroup{}
@@ -64,7 +80,7 @@ func ProcessValidationAndScoreWithValidation(engine *engine.Engine) {
6480
engine.Validate()
6581
}
6682

67-
func ProcessScoreWithoutValidation(engine *engine.Engine) {
83+
func ProcessScoreWithoutValidation(engine engine.IEngine) {
6884
defer Channels.WaitGroup.Done()
6985

7086
wgScore := &sync.WaitGroup{}

engine/chunk/chunk.go

Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,206 @@
1+
package chunk
2+
3+
//go:generate mockgen -source=$GOFILE -destination=${GOPACKAGE}_mock.go -package=${GOPACKAGE}
4+
5+
import (
6+
"bufio"
7+
"bytes"
8+
"errors"
9+
"fmt"
10+
"sync"
11+
"unicode"
12+
13+
"github.com/h2non/filetype"
14+
)
15+
16+
const (
17+
defaultSize = 100 * 1024 // 100Kib
18+
defaultMaxPeekSize = 25 * 1024 // 25Kib
19+
defaultFileThreshold = 1 * 1024 * 1024 // 1MiB
20+
)
21+
22+
var ErrUnsupportedFileType = errors.New("unsupported file type")
23+
24+
type Option func(*Chunk)
25+
26+
// WithSize sets the chunk size
27+
func WithSize(size int) Option {
28+
return func(args *Chunk) {
29+
args.size = size
30+
}
31+
}
32+
33+
// WithMaxPeekSize sets the max size of look-ahead bytes
34+
func WithMaxPeekSize(maxPeekSize int) Option {
35+
return func(args *Chunk) {
36+
args.maxPeekSize = maxPeekSize
37+
}
38+
}
39+
40+
// WithSmallFileThreshold sets the threshold for small files
41+
func WithSmallFileThreshold(smallFileThreshold int64) Option {
42+
return func(args *Chunk) {
43+
args.smallFileThreshold = smallFileThreshold
44+
}
45+
}
46+
47+
// Chunk holds two pools and sizing parameters needed for reading chunks of data with look-ahead
48+
type Chunk struct {
49+
bufPool *sync.Pool // *bytes.Buffer with cap Size + MaxPeekSize
50+
peekedBufPool *sync.Pool // *[]byte slices of length Size + MaxPeekSize
51+
size int // base chunk size
52+
maxPeekSize int // max size of look-ahead bytes
53+
smallFileThreshold int64 // files smaller than this skip chunking
54+
}
55+
56+
type IChunk interface {
57+
GetSize() int
58+
GetMaxPeekSize() int
59+
GetFileThreshold() int64
60+
ReadChunk(reader *bufio.Reader, totalLines int) (string, error)
61+
}
62+
63+
func New(opts ...Option) *Chunk {
64+
// set default options
65+
c := &Chunk{
66+
size: defaultSize,
67+
maxPeekSize: defaultMaxPeekSize,
68+
smallFileThreshold: defaultFileThreshold,
69+
}
70+
// apply overrides
71+
for _, opt := range opts {
72+
opt(c)
73+
}
74+
c.bufPool = &sync.Pool{
75+
New: func() interface{} {
76+
// pre-allocate dynamic-size buffer for reading chunks (up to chunk size + peek size)
77+
return bytes.NewBuffer(make([]byte, 0, c.size+c.maxPeekSize))
78+
},
79+
}
80+
c.peekedBufPool = &sync.Pool{
81+
New: func() interface{} {
82+
// pre-allocate fixed-size block for loading chunks
83+
b := make([]byte, c.size+c.maxPeekSize)
84+
return &b
85+
},
86+
}
87+
return c
88+
}
89+
90+
// GetBuf returns a bytes.Buffer from the pool, seeded with the data
91+
func (c *Chunk) GetBuf(data []byte) (*bytes.Buffer, bool) {
92+
window, ok := c.bufPool.Get().(*bytes.Buffer)
93+
if !ok {
94+
return nil, false
95+
}
96+
window.Write(data) // seed the buffer with the data
97+
return window, ok
98+
}
99+
100+
// PutBuf returns the bytes.Buffer to the pool
101+
func (c *Chunk) PutBuf(window *bytes.Buffer) {
102+
window.Reset()
103+
c.bufPool.Put(window)
104+
}
105+
106+
// GetPeekedBuf returns a fixed-size []byte from the pool
107+
func (c *Chunk) GetPeekedBuf() (*[]byte, bool) {
108+
b, ok := c.peekedBufPool.Get().(*[]byte)
109+
return b, ok
110+
}
111+
112+
// PutPeekedBuf returns the fixed-size []byte to the pool
113+
func (c *Chunk) PutPeekedBuf(b *[]byte) {
114+
*b = (*b)[:0] // reset the slice to zero length
115+
c.peekedBufPool.Put(b)
116+
}
117+
118+
func (c *Chunk) GetSize() int {
119+
return c.size
120+
}
121+
122+
func (c *Chunk) GetMaxPeekSize() int {
123+
return c.maxPeekSize
124+
}
125+
126+
func (c *Chunk) GetFileThreshold() int64 {
127+
return c.smallFileThreshold
128+
}
129+
130+
// ReadChunk reads the next chunk of data from file
131+
func (c *Chunk) ReadChunk(reader *bufio.Reader, totalLines int) (string, error) {
132+
// borrow a []bytes from the pool and seed it with raw data from file (up to chunk size + peek size)
133+
rawData, ok := c.GetPeekedBuf()
134+
if !ok {
135+
return "", fmt.Errorf("expected *bytes.Buffer, got %T", rawData)
136+
}
137+
defer c.PutPeekedBuf(rawData)
138+
n, err := reader.Read(*rawData)
139+
140+
var chunkStr string
141+
// "Callers should always process the n > 0 bytes returned before considering the error err."
142+
// https://pkg.go.dev/io#Reader
143+
if n > 0 {
144+
// only check the filetype at the start of file
145+
if totalLines == 0 && ShouldSkipFile((*rawData)[:n]) {
146+
return "", fmt.Errorf("skipping file: %w", ErrUnsupportedFileType)
147+
}
148+
149+
chunkStr, err = c.generateChunk((*rawData)[:n])
150+
}
151+
if err != nil {
152+
return "", err
153+
}
154+
return chunkStr, nil
155+
}
156+
157+
// generateChunk processes block of raw data and generates chunk to be scanned
158+
func (c *Chunk) generateChunk(rawData []byte) (string, error) {
159+
// Borrow a buffer from the pool and seed it with raw data (up to chunk size)
160+
initialChunkLen := min(len(rawData), c.size)
161+
chunkData, ok := c.GetBuf(rawData[:initialChunkLen])
162+
if !ok {
163+
return "", fmt.Errorf("expected *bytes.Buffer, got %T", chunkData)
164+
}
165+
defer c.PutBuf(chunkData)
166+
167+
// keep seeding chunk until detecting the “\n...\n” (i.e. safe boundary)
168+
// or reaching the max limit of chunk size (i.e. chunk size + peek size)
169+
for i := chunkData.Len(); i < len(rawData); i++ {
170+
if endsWithTwoNewlines(rawData[:i]) {
171+
break
172+
}
173+
chunkData.WriteByte(rawData[i])
174+
}
175+
176+
return chunkData.String(), nil
177+
}
178+
179+
// endsWithTwoNewlines returns true if b ends in at least two '\n's (ignoring any number of ' ', '\r', or '\t' between them)
180+
func endsWithTwoNewlines(b []byte) bool {
181+
count := 0
182+
for i := len(b) - 1; i >= 0; i-- {
183+
if b[i] == '\n' {
184+
count++
185+
if count >= 2 {
186+
return true
187+
}
188+
} else if unicode.IsSpace(rune(b[i])) {
189+
// the presence of other whitespace characters (`\r`, ` `, `\t`) shouldn't reset the count
190+
continue
191+
} else {
192+
return false
193+
}
194+
}
195+
return false
196+
}
197+
198+
// ShouldSkipFile checks if the file should be skipped based on its content type
199+
func ShouldSkipFile(data []byte) bool {
200+
// TODO: could other optimizations be introduced here?
201+
mimetype, err := filetype.Match(data)
202+
if err != nil {
203+
return true // could not determine file type
204+
}
205+
return mimetype.MIME.Type == "application" // skip binary files
206+
}

0 commit comments

Comments
 (0)