Skip to content

Commit eab4790

Browse files
committed
refactor(event): Improve callstack decorator
Keep the stack walk events indexed by stack id. The stack id is a tuple of pid,tid associated with the thread executing the code. For every unique stack id, the queue of pending events is maintained. When the corresponding stack walk event arrives, the oldest event is pulled from the queue and enriched with stack addresses.
1 parent 3b8494e commit eab4790

File tree

6 files changed

+99
-43
lines changed

6 files changed

+99
-43
lines changed

go.mod

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ require (
99
github.com/cenkalti/backoff/v4 v4.3.0
1010
github.com/dustin/go-humanize v1.0.0
1111
github.com/enescakir/emoji v1.0.0
12-
github.com/gammazero/deque v0.2.1
1312
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e
1413
github.com/hashicorp/go-version v1.2.1
1514
github.com/hillu/go-yara/v4 v4.2.4

go.sum

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,6 @@ github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHqu
5858
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
5959
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
6060
github.com/fzipp/gocyclo v0.3.1/go.mod h1:DJHO6AUmbdqj2ET4Z9iArSuwWgYDRryYt2wASxc7x3E=
61-
github.com/gammazero/deque v0.2.1 h1:qSdsbG6pgp6nL7A0+K/B7s12mcCY/5l5SIUpMOl+dC0=
62-
github.com/gammazero/deque v0.2.1/go.mod h1:LFroj8x4cMYCukHJDbxFCkT+r9AndaJnFMuZDV34tuU=
6361
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
6462
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
6563
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=

internal/etw/processors/fs_windows.go

Lines changed: 38 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ package processors
2020

2121
import (
2222
"expvar"
23-
"github.com/gammazero/deque"
2423
"github.com/rabbitstack/fibratus/pkg/config"
2524
"github.com/rabbitstack/fibratus/pkg/fs"
2625
"github.com/rabbitstack/fibratus/pkg/handle"
@@ -60,9 +59,10 @@ type fsProcessor struct {
6059
devPathResolver fs.DevPathResolver
6160
config *config.Config
6261

63-
deq *deque.Deque[*kevent.Kevent]
64-
mu sync.Mutex
65-
purger *time.Ticker
62+
// buckets stores stack walk events per stack id
63+
buckets map[uint64][]*kevent.Kevent
64+
mu sync.Mutex
65+
purger *time.Ticker
6666

6767
quit chan struct{}
6868
}
@@ -88,7 +88,7 @@ func newFsProcessor(
8888
devMapper: devMapper,
8989
devPathResolver: devPathResolver,
9090
config: config,
91-
deq: deque.New[*kevent.Kevent](100),
91+
buckets: make(map[uint64][]*kevent.Kevent),
9292
purger: time.NewTicker(time.Second * 5),
9393
quit: make(chan struct{}, 1),
9494
}
@@ -159,7 +159,15 @@ func (f *fsProcessor) processEvent(e *kevent.Kevent) (*kevent.Kevent, error) {
159159
if !kevent.IsCurrentProcDropped(e.PID) {
160160
f.mu.Lock()
161161
defer f.mu.Unlock()
162-
f.deq.PushBack(e)
162+
163+
// append the event to the bucket indexed by stack id
164+
id := e.StackID()
165+
q, ok := f.buckets[id]
166+
if !ok {
167+
f.buckets[id] = []*kevent.Kevent{e}
168+
} else {
169+
f.buckets[id] = append(q, e)
170+
}
163171
}
164172
case ktypes.FileOpEnd:
165173
// get the CreateFile pending event by IRP identifier
@@ -169,6 +177,7 @@ func (f *fsProcessor) processEvent(e *kevent.Kevent) (*kevent.Kevent, error) {
169177
dispo = e.Kparams.MustGetUint64(kparams.FileExtraInfo)
170178
status = e.Kparams.MustGetUint32(kparams.NTStatus)
171179
)
180+
172181
if dispo > windows.FILE_MAXIMUM_DISPOSITION {
173182
return e, nil
174183
}
@@ -177,10 +186,12 @@ func (f *fsProcessor) processEvent(e *kevent.Kevent) (*kevent.Kevent, error) {
177186
return e, nil
178187
}
179188
delete(f.irps, irp)
189+
180190
// reset the wait status to allow passage of this event to
181191
// the aggregator queue. Additionally, append params to it
182192
ev.WaitEnqueue = false
183193
fileObject := ev.Kparams.MustGetUint64(kparams.FileObject)
194+
184195
// try to get extended file info. If the file object is already
185196
// present in the map, we'll reuse the existing file information
186197
fileinfo, ok := f.files[fileObject]
@@ -191,9 +202,11 @@ func (f *fsProcessor) processEvent(e *kevent.Kevent) (*kevent.Kevent, error) {
191202
fileinfo = f.getFileInfo(filepath, opts)
192203
f.files[fileObject] = fileinfo
193204
}
205+
194206
if f.config.Kstream.EnableHandleKevents {
195207
f.devPathResolver.AddPath(ev.GetParamAsString(kparams.FilePath))
196208
}
209+
197210
ev.AppendParam(kparams.NTStatus, kparams.Status, status)
198211
if fileinfo.Type != fs.Unknown {
199212
ev.AppendEnum(kparams.FileType, uint32(fileinfo.Type), fs.FileTypes)
@@ -205,17 +218,20 @@ func (f *fsProcessor) processEvent(e *kevent.Kevent) (*kevent.Kevent, error) {
205218
// the events are delayed until the respective FileOpEnd
206219
// event arrives, we enable stack tracing for CreateFile
207220
// events. When the CreateFile event is generated, we store
208-
// it pending IRP map. Subsequently, the stack walk event is
209-
// generated which we put inside the queue. After FileOpEnd
210-
// arrives, the previous stack walk for CreateFile is popped
211-
// from the queue and the callstack parameter attached to the
221+
// it in pending IRP map. Subsequently, the stack walk event
222+
// is put inside the queue. After FileOpEnd event arrives,
223+
// the previous stack walk for CreateFile is popped from
224+
// the queue and the callstack parameter attached to the
212225
// event.
213226
if f.config.Kstream.StackEnrichment {
214227
f.mu.Lock()
215228
defer f.mu.Unlock()
216-
i := f.deq.RIndex(func(evt *kevent.Kevent) bool { return evt.StackID() == ev.StackID() })
217-
if i != -1 {
218-
s := f.deq.Remove(i)
229+
230+
id := ev.StackID()
231+
q, ok := f.buckets[id]
232+
if ok && len(q) > 0 {
233+
var s *kevent.Kevent
234+
s, f.buckets[id] = q[len(q)-1], q[:len(q)-1]
219235
callstack := s.Kparams.MustGetSlice(kparams.Callstack)
220236
ev.AppendParam(kparams.Callstack, kparams.Slice, callstack)
221237
}
@@ -228,6 +244,7 @@ func (f *fsProcessor) processEvent(e *kevent.Kevent) (*kevent.Kevent, error) {
228244
return ev, nil
229245
}
230246
}
247+
231248
return ev, nil
232249
case ktypes.ReleaseFile:
233250
fileReleaseCount.Add(1)
@@ -297,6 +314,7 @@ func (f *fsProcessor) processEvent(e *kevent.Kevent) (*kevent.Kevent, error) {
297314
return e, f.psnap.AddMmap(e)
298315
}
299316
}
317+
300318
return e, nil
301319
}
302320

@@ -336,13 +354,16 @@ func (f *fsProcessor) purge() {
336354
select {
337355
case <-f.purger.C:
338356
f.mu.Lock()
357+
339358
// evict unmatched stack traces
340-
for i := 0; i < f.deq.Len(); i++ {
341-
evt := f.deq.At(i)
342-
if time.Since(evt.Timestamp) > time.Second*30 {
343-
f.deq.Remove(i)
359+
for id, q := range f.buckets {
360+
for i, evt := range q {
361+
if time.Since(evt.Timestamp) > time.Second*30 {
362+
f.buckets[id] = append(q[:i], q[i+1:]...)
363+
}
344364
}
345365
}
366+
346367
f.mu.Unlock()
347368
case <-f.quit:
348369
return

pkg/kevent/callstack.go

Lines changed: 53 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ package kevent
2020

2121
import (
2222
"expvar"
23-
"github.com/gammazero/deque"
2423
"github.com/rabbitstack/fibratus/pkg/kevent/kparams"
2524
"github.com/rabbitstack/fibratus/pkg/util/multierror"
2625
"github.com/rabbitstack/fibratus/pkg/util/va"
@@ -378,9 +377,9 @@ func (s Callstack) CallsiteInsns(pid uint32, leading bool) []string {
378377
// popped from the queue and enriched with return addresses
379378
// which are later subject to symbolization.
380379
type CallstackDecorator struct {
381-
deq *deque.Deque[*Kevent]
382-
q *Queue
383-
mux sync.Mutex
380+
buckets map[uint64][]*Kevent
381+
q *Queue
382+
mux sync.Mutex
384383

385384
flusher *time.Ticker
386385
quit chan struct{}
@@ -392,19 +391,29 @@ type CallstackDecorator struct {
392391
func NewCallstackDecorator(q *Queue) *CallstackDecorator {
393392
c := &CallstackDecorator{
394393
q: q,
395-
deq: deque.New[*Kevent](100),
394+
buckets: make(map[uint64][]*Kevent),
396395
flusher: time.NewTicker(flusherInterval),
397396
quit: make(chan struct{}, 1),
398397
}
398+
399399
go c.doFlush()
400+
400401
return c
401402
}
402403

403404
// Push pushes a new event to the queue.
404405
func (cd *CallstackDecorator) Push(e *Kevent) {
405406
cd.mux.Lock()
406407
defer cd.mux.Unlock()
407-
cd.deq.PushBack(e)
408+
409+
// append the event to the bucket indexed by stack id
410+
id := e.StackID()
411+
q, ok := cd.buckets[id]
412+
if !ok {
413+
cd.buckets[id] = []*Kevent{e}
414+
} else {
415+
cd.buckets[id] = append(q, e)
416+
}
408417
}
409418

410419
// Pop receives the stack walk event and pops the oldest
@@ -414,13 +423,25 @@ func (cd *CallstackDecorator) Push(e *Kevent) {
414423
func (cd *CallstackDecorator) Pop(e *Kevent) *Kevent {
415424
cd.mux.Lock()
416425
defer cd.mux.Unlock()
417-
i := cd.deq.Index(func(evt *Kevent) bool { return evt.StackID() == e.StackID() })
418-
if i == -1 {
426+
427+
id := e.StackID()
428+
q, ok := cd.buckets[id]
429+
if !ok {
419430
return e
420431
}
421-
evt := cd.deq.Remove(i)
432+
433+
var evt *Kevent
434+
if len(q) > 0 {
435+
evt, cd.buckets[id] = q[0], q[1:]
436+
}
437+
438+
if evt == nil {
439+
return e
440+
}
441+
422442
callstack := e.Kparams.MustGetSlice(kparams.Callstack)
423443
evt.AppendParam(kparams.Callstack, kparams.Slice, callstack)
444+
424445
return evt
425446
}
426447

@@ -429,6 +450,13 @@ func (cd *CallstackDecorator) Stop() {
429450
cd.quit <- struct{}{}
430451
}
431452

453+
// RemoveBucket removes the bucket and all enqueued events.
454+
func (cd *CallstackDecorator) RemoveBucket(id uint64) {
455+
cd.mux.Lock()
456+
defer cd.mux.Unlock()
457+
delete(cd.buckets, id)
458+
}
459+
432460
func (cd *CallstackDecorator) doFlush() {
433461
for {
434462
select {
@@ -449,20 +477,26 @@ func (cd *CallstackDecorator) doFlush() {
449477
func (cd *CallstackDecorator) flush() []error {
450478
cd.mux.Lock()
451479
defer cd.mux.Unlock()
452-
if cd.deq.Len() == 0 {
480+
481+
if len(cd.buckets) == 0 {
453482
return nil
454483
}
484+
455485
errs := make([]error, 0)
456-
for i := 0; i < cd.deq.Len(); i++ {
457-
evt := cd.deq.At(i)
458-
if time.Since(evt.Timestamp) < maxDequeFlushPeriod {
459-
continue
460-
}
461-
callstackFlushes.Add(1)
462-
err := cd.q.push(cd.deq.Remove(i))
463-
if err != nil {
464-
errs = append(errs, err)
486+
487+
for id, q := range cd.buckets {
488+
for i, evt := range q {
489+
if time.Since(evt.Timestamp) < maxDequeFlushPeriod {
490+
continue
491+
}
492+
callstackFlushes.Add(1)
493+
err := cd.q.push(evt)
494+
if err != nil {
495+
errs = append(errs, err)
496+
}
497+
cd.buckets[id] = append(q[:i], q[i+1:]...)
465498
}
466499
}
500+
467501
return errs
468502
}

pkg/kevent/callstack_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ func TestCallstackDecorator(t *testing.T) {
113113
cd.Push(e)
114114
cd.Push(e1)
115115

116-
assert.True(t, cd.deq.Len() == 2)
116+
assert.Len(t, cd.buckets[e.StackID()], 2)
117117

118118
sw := &Kevent{
119119
Type: ktypes.StackWalk,
@@ -129,7 +129,7 @@ func TestCallstackDecorator(t *testing.T) {
129129
}
130130

131131
evt := cd.Pop(sw)
132-
assert.True(t, cd.deq.Len() == 1)
132+
assert.Len(t, cd.buckets[e.StackID()], 1)
133133
assert.Equal(t, ktypes.CreateFile, evt.Type)
134134
assert.True(t, evt.Kparams.Contains(kparams.Callstack))
135135
assert.Equal(t, "C:\\Windows\\system32\\user32.dll", evt.GetParamAsString(kparams.FilePath))
@@ -164,11 +164,11 @@ func TestCallstackDecoratorFlush(t *testing.T) {
164164
}
165165

166166
cd.Push(e)
167-
assert.True(t, cd.deq.Len() == 1)
167+
assert.Len(t, cd.buckets[e.StackID()], 1)
168168
time.Sleep(time.Millisecond * 3100)
169169

170170
evt := <-q.Events()
171-
assert.True(t, cd.deq.Len() == 0)
171+
assert.Len(t, cd.buckets[e.StackID()], 0)
172172
assert.Equal(t, ktypes.CreateFile, evt.Type)
173173
assert.False(t, evt.Kparams.Contains(kparams.Callstack))
174174
}

pkg/kevent/queue.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,10 @@ func (q *Queue) push(e *Kevent) error {
157157
enqueue = true
158158
}
159159
}
160+
if q.stackEnrichment && e.IsTerminateThread() {
161+
id := uint64(e.Kparams.MustGetPid() + e.Kparams.MustGetTid())
162+
q.cd.RemoveBucket(id)
163+
}
160164
if enqueue || len(q.listeners) == 0 {
161165
q.q <- e
162166
keventsEnqueued.Add(1)

0 commit comments

Comments
 (0)