Skip to content

Commit f4b357c

Browse files
committed
feat(instrumentation,telemetry): Incorporate thread pool telemetry
The thread pool telemetry is captured from the dedicated thread pool provider. Presently, we collect three different event types including the call stacks. More specifically, SubmitThreadpoolWork, SubmitThreadpoolCallback, and SetThreadpoolTimer events. Additionally, the callback addresses are symbolized to derive to function name, and when the callback is the ZwContinue or RtlCaputreContext function call, we also try to decode the CONTEXT structure and symbolize the instruction pointer address.
1 parent 21eb54b commit f4b357c

23 files changed

+546
-63
lines changed

configs/fibratus.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,9 @@ kstream:
229229
# Determines whether DNS client events are collected
230230
#enable-dns: true
231231

232+
# Determines whether thread pool events are collected
233+
#enable-threadpool: true
234+
232235
# Indicates if stack enrichment is enabled for eligible events
233236
#stack-enrichment: true
234237

internal/etw/source.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ func (e *EventSource) Open(config *config.Config) error {
147147
config.Kstream.EnableMemKevents = config.Kstream.EnableMemKevents && (e.r.HasMemEvents || (config.Yara.Enabled && !config.Yara.SkipAllocs))
148148
config.Kstream.EnableDNSEvents = config.Kstream.EnableDNSEvents && e.r.HasDNSEvents
149149
config.Kstream.EnableAuditAPIEvents = config.Kstream.EnableAuditAPIEvents && e.r.HasAuditAPIEvents
150+
config.Kstream.EnableThreadpoolEvents = config.Kstream.EnableThreadpoolEvents && e.r.HasThreadpoolEvents
150151
for _, ktype := range ktypes.All() {
151152
if ktype == ktypes.CreateProcess || ktype == ktypes.TerminateProcess ||
152153
ktype == ktypes.LoadImage || ktype == ktypes.UnloadImage {
@@ -189,6 +190,9 @@ func (e *EventSource) Open(config *config.Config) error {
189190
if config.Kstream.EnableAuditAPIEvents {
190191
e.addTrace(etw.KernelAuditAPICallsSession, etw.KernelAuditAPICallsGUID)
191192
}
193+
if config.Kstream.EnableThreadpoolEvents {
194+
e.addTrace(etw.ThreadpoolSession, etw.ThreadpoolGUID)
195+
}
192196

193197
for _, trace := range e.traces {
194198
err := trace.Start()
@@ -226,6 +230,7 @@ func (e *EventSource) Open(config *config.Config) error {
226230
// Init consumer and open the trace for processing
227231
consumer := NewConsumer(e.psnap, e.hsnap, config, e.sequencer, e.evts)
228232
consumer.SetFilter(e.filter)
233+
229234
// Attach event listeners
230235
for _, lis := range e.listeners {
231236
consumer.q.RegisterListener(lis)

internal/etw/stackext.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,3 +100,12 @@ func (s *StackExtensions) EnableMemoryCallstack() {
100100
s.AddStackTracing(ktypes.VirtualAlloc)
101101
}
102102
}
103+
104+
// EnableThreadpoolCallstack enables stack tracing for thread pool events.
105+
func (s *StackExtensions) EnableThreadpoolCallstack() {
106+
if s.config.EnableThreadpoolEvents {
107+
s.AddStackTracing(ktypes.SubmitThreadpoolWork)
108+
s.AddStackTracing(ktypes.SubmitThreadpoolCallback)
109+
s.AddStackTracing(ktypes.SetThreadpoolTimer)
110+
}
111+
}

internal/etw/trace.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,10 @@ func (t *Trace) enableCallstacks() {
150150
if t.IsSystemRegistryTrace() {
151151
t.stackExtensions.EnableRegistryCallstack()
152152
}
153+
154+
if t.IsThreadpoolTrace() {
155+
t.stackExtensions.EnableThreadpoolCallstack()
156+
}
153157
}
154158

155159
// Start registers and starts an event tracing session.
@@ -202,7 +206,9 @@ func (t *Trace) Start() error {
202206
log.Warnf("unable to set empty system flags: %v", err)
203207
return nil
204208
}
209+
205210
sysTraceFlags[0] = flags
211+
206212
// enable object manager tracking
207213
if cfg.EnableHandleKevents {
208214
sysTraceFlags[4] = etw.Handle
@@ -225,13 +231,14 @@ func (t *Trace) Start() error {
225231
// enrichment is enabled, it is necessary to instruct the provider
226232
// to emit stack addresses in the extended data item section when
227233
// writing events to the session buffers
228-
if cfg.StackEnrichment && !t.IsSystemProvider() {
234+
if cfg.StackEnrichment && !t.IsSystemProvider() && !t.IsThreadpoolTrace() {
229235
return etw.EnableTraceWithOpts(t.GUID, t.startHandle, t.Keywords, etw.EnableTraceOpts{WithStacktrace: true})
230236
} else if cfg.StackEnrichment && len(t.stackExtensions.EventIds()) > 0 {
231237
if err := etw.EnableStackTracing(t.startHandle, t.stackExtensions.EventIds()); err != nil {
232238
return fmt.Errorf("fail to enable system events callstack tracing: %v", err)
233239
}
234240
}
241+
235242
if t.IsSystemRegistryTrace() {
236243
if err := etw.EnableTrace(t.GUID, t.startHandle, t.Keywords); err != nil {
237244
return err
@@ -249,6 +256,7 @@ func (t *Trace) Start() error {
249256
sysTraceFlags[0] = etw.Registry
250257
return etw.SetTraceSystemFlags(handle, sysTraceFlags)
251258
}
259+
252260
return etw.EnableTrace(t.GUID, t.startHandle, t.Keywords)
253261
}
254262

@@ -343,6 +351,9 @@ func (t *Trace) IsKernelTrace() bool { return t.GUID == etw.KernelTraceControlGU
343351
// IsSystemRegistryTrace determines if this is the system registry logger trace.
344352
func (t *Trace) IsSystemRegistryTrace() bool { return t.GUID == etw.SystemRegistryProviderID }
345353

354+
// IsThreadpoolTrace determines if this is the thread pool logger trace.
355+
func (t *Trace) IsThreadpoolTrace() bool { return t.GUID == etw.ThreadpoolGUID }
356+
346357
// IsSystemProvider determines if this is one of the granular system provider traces.
347358
func (t *Trace) IsSystemProvider() bool {
348359
return t.GUID == etw.SystemIOProviderID || t.GUID == etw.SystemRegistryProviderID || t.GUID == etw.SystemProcessProviderID || t.GUID == etw.SystemMemoryProviderID

pkg/config/config_windows.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -411,6 +411,7 @@ func (c *Config) addFlags() {
411411
c.flags.Bool(enableMemKevents, true, "Determines whether memory manager kernel events are collected by Kernel Logger provider")
412412
c.flags.Bool(enableAuditAPIEvents, true, "Determines whether kernel audit API calls events are published")
413413
c.flags.Bool(enableDNSEvents, true, "Determines whether DNS client events are enabled")
414+
c.flags.Bool(enableThreadpoolEvents, true, "Determines whether thread pool events are published")
414415
c.flags.Bool(stackEnrichment, true, "Indicates if stack enrichment is enabled for eligible events")
415416
c.flags.Int(bufferSize, int(maxBufferSize), "Represents the amount of memory allocated for each event tracing session buffer, in kilobytes. The buffer size affects the rate at which buffers fill and must be flushed (small buffer size requires less memory but it increases the rate at which buffers must be flushed)")
416417
c.flags.Int(minBuffers, int(defaultMinBuffers), "Determines the minimum number of buffers allocated for the event tracing session's buffer pool")

pkg/config/filters.go

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -170,19 +170,20 @@ func (ctx *ActionContext) UniquePids() []uint32 {
170170
// enabling/disabling event providers/types
171171
// dynamically.
172172
type RulesCompileResult struct {
173-
HasProcEvents bool
174-
HasThreadEvents bool
175-
HasImageEvents bool
176-
HasFileEvents bool
177-
HasNetworkEvents bool
178-
HasRegistryEvents bool
179-
HasHandleEvents bool
180-
HasMemEvents bool
181-
HasVAMapEvents bool
182-
HasDNSEvents bool
183-
HasAuditAPIEvents bool
184-
UsedEvents []ktypes.Ktype
185-
NumberRules int
173+
HasProcEvents bool
174+
HasThreadEvents bool
175+
HasImageEvents bool
176+
HasFileEvents bool
177+
HasNetworkEvents bool
178+
HasRegistryEvents bool
179+
HasHandleEvents bool
180+
HasMemEvents bool
181+
HasVAMapEvents bool
182+
HasDNSEvents bool
183+
HasAuditAPIEvents bool
184+
HasThreadpoolEvents bool
185+
UsedEvents []ktypes.Ktype
186+
NumberRules int
186187
}
187188

188189
func (r RulesCompileResult) ContainsEvent(ktype ktypes.Ktype) bool {
@@ -217,6 +218,7 @@ func (r RulesCompileResult) String() string {
217218
HasVAMapEvents: %t
218219
HasAuditAPIEvents: %t
219220
HasDNSEvents: %t
221+
HasThreadpoolEvents: %t
220222
Events: %s`,
221223
r.HasProcEvents,
222224
r.HasThreadEvents,
@@ -229,6 +231,7 @@ func (r RulesCompileResult) String() string {
229231
r.HasVAMapEvents,
230232
r.HasAuditAPIEvents,
231233
r.HasDNSEvents,
234+
r.HasThreadpoolEvents,
232235
strings.Join(events, ", "),
233236
)
234237
}

pkg/config/kstream.go

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -32,21 +32,22 @@ import (
3232
)
3333

3434
const (
35-
enableThreadKevents = "kstream.enable-thread"
36-
enableRegistryKevents = "kstream.enable-registry"
37-
enableNetKevents = "kstream.enable-net"
38-
enableFileIOKevents = "kstream.enable-fileio"
39-
enableVAMapKevents = "kstream.enable-vamap"
40-
enableImageKevents = "kstream.enable-image"
41-
enableHandleKevents = "kstream.enable-handle"
42-
enableMemKevents = "kstream.enable-mem"
43-
enableAuditAPIEvents = "kstream.enable-audit-api"
44-
enableDNSEvents = "kstream.enable-dns"
45-
stackEnrichment = "kstream.stack-enrichment"
46-
bufferSize = "kstream.buffer-size"
47-
minBuffers = "kstream.min-buffers"
48-
maxBuffers = "kstream.max-buffers"
49-
flushInterval = "kstream.flush-interval"
35+
enableThreadKevents = "kstream.enable-thread"
36+
enableRegistryKevents = "kstream.enable-registry"
37+
enableNetKevents = "kstream.enable-net"
38+
enableFileIOKevents = "kstream.enable-fileio"
39+
enableVAMapKevents = "kstream.enable-vamap"
40+
enableImageKevents = "kstream.enable-image"
41+
enableHandleKevents = "kstream.enable-handle"
42+
enableMemKevents = "kstream.enable-mem"
43+
enableAuditAPIEvents = "kstream.enable-audit-api"
44+
enableDNSEvents = "kstream.enable-dns"
45+
enableThreadpoolEvents = "kstream.enable-threadpool"
46+
stackEnrichment = "kstream.stack-enrichment"
47+
bufferSize = "kstream.buffer-size"
48+
minBuffers = "kstream.min-buffers"
49+
maxBuffers = "kstream.max-buffers"
50+
flushInterval = "kstream.flush-interval"
5051

5152
excludedEvents = "kstream.blacklist.events"
5253
excludedImages = "kstream.blacklist.images"
@@ -82,6 +83,8 @@ type KstreamConfig struct {
8283
EnableAuditAPIEvents bool `json:"enable-audit-api" yaml:"enable-audit-api"`
8384
// EnableDNSEvents indicates if DNS client events are enabled
8485
EnableDNSEvents bool `json:"enable-dns" yaml:"enable-dns"`
86+
// EnableThreadpoolEvents indicates if thread pool events are enabled
87+
EnableThreadpoolEvents bool `json:"enable-threadpool" yaml:"enable-threadpool"`
8588
// StackEnrichment indicates if stack enrichment is enabled for eligible events.
8689
StackEnrichment bool `json:"stack-enrichment" yaml:"stack-enrichment"`
8790
// BufferSize represents the amount of memory allocated for each event tracing session buffer, in kilobytes.
@@ -115,6 +118,7 @@ func (c *KstreamConfig) initFromViper(v *viper.Viper) {
115118
c.EnableMemKevents = v.GetBool(enableMemKevents)
116119
c.EnableAuditAPIEvents = v.GetBool(enableAuditAPIEvents)
117120
c.EnableDNSEvents = v.GetBool(enableDNSEvents)
121+
c.EnableThreadpoolEvents = v.GetBool(enableThreadpoolEvents)
118122
c.StackEnrichment = v.GetBool(stackEnrichment)
119123
c.BufferSize = uint32(v.GetInt(bufferSize))
120124
c.MinBuffers = uint32(v.GetInt(minBuffers))

pkg/config/schema_windows.go

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -179,25 +179,26 @@ var schema = `
179179
"kstream": {
180180
"type": "object",
181181
"properties": {
182-
"enable-thread": {"type": "boolean"},
183-
"enable-image": {"type": "boolean"},
184-
"enable-registry": {"type": "boolean"},
185-
"enable-fileio": {"type": "boolean"},
186-
"enable-vamap": {"type": "boolean"},
187-
"enable-handle": {"type": "boolean"},
188-
"enable-net": {"type": "boolean"},
189-
"enable-mem": {"type": "boolean"},
190-
"enable-audit-api": {"type": "boolean"},
191-
"enable-dns": {"type": "boolean"},
192-
"stack-enrichment": {"type": "boolean"},
193-
"min-buffers": {"type": "integer", "minimum": 1, "maximum": {{ .MinBuffers }}},
194-
"max-buffers": {"type": "integer", "minimum": 2, "maximum": {{ .MaxBuffers }}},
195-
"buffer-size": {"type": "integer", "maximum": {{ .MaxBufferSize }}},
182+
"enable-thread": {"type": "boolean"},
183+
"enable-image": {"type": "boolean"},
184+
"enable-registry": {"type": "boolean"},
185+
"enable-fileio": {"type": "boolean"},
186+
"enable-vamap": {"type": "boolean"},
187+
"enable-handle": {"type": "boolean"},
188+
"enable-net": {"type": "boolean"},
189+
"enable-mem": {"type": "boolean"},
190+
"enable-audit-api": {"type": "boolean"},
191+
"enable-dns": {"type": "boolean"},
192+
"enable-threadpool": {"type": "boolean"},
193+
"stack-enrichment": {"type": "boolean"},
194+
"min-buffers": {"type": "integer", "minimum": 1, "maximum": {{ .MinBuffers }}},
195+
"max-buffers": {"type": "integer", "minimum": 2, "maximum": {{ .MaxBuffers }}},
196+
"buffer-size": {"type": "integer", "maximum": {{ .MaxBufferSize }}},
196197
"flush-interval": {"type": "string", "minLength": 2, "pattern": "[0-9]+s"},
197198
"blacklist": {
198199
"type": "object",
199200
"properties": {
200-
"events": {"type": "array", "items": {"type": "string", "enum": ["CreateThread", "TerminateThread", "OpenProcess", "OpenThread", "SetThreadContext", "LoadImage", "UnloadImage", "CreateFile", "CloseFile", "ReadFile", "WriteFile", "DeleteFile", "RenameFile", "SetFileInformation", "EnumDirectory", "MapViewFile", "UnmapViewFile", "RegCreateKey", "RegOpenKey", "RegSetValue", "RegQueryValue", "RegQueryKey", "RegDeleteKey", "RegDeleteValue", "RegCloseKey", "Accept", "Send", "Recv", "Connect", "Disconnect", "Reconnect", "Retransmit", "CreateHandle", "CloseHandle", "DuplicateHandle", "QueryDns", "ReplyDns", "VirtualAlloc", "VirtualFree", "CreateSymbolicLinkObject"]}},
201+
"events": {"type": "array", "items": {"type": "string", "enum": ["CreateThread", "TerminateThread", "OpenProcess", "OpenThread", "SetThreadContext", "LoadImage", "UnloadImage", "CreateFile", "CloseFile", "ReadFile", "WriteFile", "DeleteFile", "RenameFile", "SetFileInformation", "EnumDirectory", "MapViewFile", "UnmapViewFile", "RegCreateKey", "RegOpenKey", "RegSetValue", "RegQueryValue", "RegQueryKey", "RegDeleteKey", "RegDeleteValue", "RegCloseKey", "Accept", "Send", "Recv", "Connect", "Disconnect", "Reconnect", "Retransmit", "CreateHandle", "CloseHandle", "DuplicateHandle", "QueryDns", "ReplyDns", "VirtualAlloc", "VirtualFree", "CreateSymbolicLinkObject", "SubmitThreadpoolWork", "SubmitThreadpoolCallback", "SetThreadpoolTimer"]}},
201202
"images": {"type": "array", "items": {"type": "string", "minLength": 1}}
202203
},
203204
"additionalProperties": false

pkg/filter/rules.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -647,6 +647,8 @@ func (r *Rules) buildCompileResult() *config.RulesCompileResult {
647647
rs.HasMemEvents = true
648648
case ktypes.Handle:
649649
rs.HasHandleEvents = true
650+
case ktypes.Threadpool:
651+
rs.HasThreadpoolEvents = true
650652
}
651653
if typ == ktypes.MapViewFile || typ == ktypes.UnmapViewFile {
652654
rs.HasVAMapEvents = true

pkg/kevent/kevent_windows.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -556,6 +556,16 @@ func (e *Kevent) Summary() string {
556556
case ktypes.ReplyDNS:
557557
dnsName := e.GetParamAsString(kparams.DNSName)
558558
return printSummary(e, fmt.Sprintf("received DNS response for <code>%s</code> query", dnsName))
559+
case ktypes.CreateSymbolicLinkObject:
560+
src := e.GetParamAsString(kparams.LinkSource)
561+
target := e.GetParamAsString(kparams.LinkTarget)
562+
return printSummary(e, fmt.Sprintf("created symbolic link from %s to %s", src, target))
563+
case ktypes.SubmitThreadpoolWork:
564+
return printSummary(e, "enqueued the work item to the thread pool")
565+
case ktypes.SubmitThreadpoolCallback:
566+
return printSummary(e, "Submitted the thread pool callback for execution within the work item")
567+
case ktypes.SetThreadpoolTimer:
568+
return printSummary(e, "set thread pool timer object")
559569
}
560570
return ""
561571
}

0 commit comments

Comments
 (0)