Skip to content

Commit 0613d65

Browse files
RichardoCclaude
andcommitted
refactor(k8saudit): use event-driven file watching with fsnotify
Replace polling-based file watching with fsnotify for better efficiency. Key changes: - Use fsnotify to watch parent directory (per maintainer recommendation) - Rename scheme from tail:// to file:// - Remove watchPollIntervalMs config (no longer needed) - Rename test package from tail to filewatch Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 2b7e054 commit 0613d65

File tree

6 files changed

+98
-112
lines changed

6 files changed

+98
-112
lines changed

plugins/k8saudit/README.md

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ Audit events are logged by the API server when almost every cluster management t
99

1010
This plugin supports consuming Kubernetes Audit Events coming from the [Webhook backend](https://kubernetes.io/docs/tasks/debug/debug-cluster/audit/#webhook-backend) or from a file. For webhooks, the plugin embeds a web server that listens on a configurable port and accepts POST requests. The posted JSON object comprises one or more events. The web server of the plugin can be configured as part of the plugin's init configuration and open parameters. For files, the plugin expects content to be in [JSONL format](https://jsonlines.org/), where each line represents a JSON object, containing one or more audit events.
1111

12-
The expected way of using the plugin with Falco is through a Webhook. File reading support can be used with Stratoshark or testing and development. The `tail://` scheme enables continuous file watching with log rotation support, useful for reading audit logs written to disk by the API server.
12+
The expected way of using the plugin with Falco is through a Webhook. File reading support can be used with Stratoshark or testing and development. The `file://` scheme enables continuous file watching with log rotation support, useful for reading audit logs written to disk by the API server.
1313

1414
## Capabilities
1515

@@ -131,12 +131,11 @@ load_plugins: [k8saudit, json]
131131
- `maxEventSize`: Maximum size of single audit event (Default: 262144)
132132
- `webhookMaxBatchSize`: Maximum size of incoming webhook POST request bodies (Default: 12582912)
133133
- `useAsync`: If true, then async extraction optimization is enabled (Default: true)
134-
- `watchPollIntervalMs`: Polling interval in milliseconds when watching a file with the `tail://` scheme (Default: 250)
135134

136135
**Open Parameters**:
137136
- `http://<host>:<port>/<endpoint>`: Opens an event stream by listening on an HTTP web server
138137
- `https://<host>:<port>/<endpoint>`: Opens an event stream by listening on an HTTPS web server
139-
- `tail://<filepath>`: Opens an event stream by continuously watching a file for new audit events, similar to `tail -f`. Handles log rotation (inode changes) and file truncation automatically. Example: `tail:///var/log/kube-apiserver/audit.log`
138+
- `file://<filepath>`: Opens an event stream by continuously watching a file for new audit events. Handles log rotation automatically. Example: `file:///var/log/kube-apiserver/audit.log`
140139
- `no scheme`: Opens an event stream by reading the events from a file on the local filesystem. The params string is interpreted as a filepath
141140

142141

plugins/k8saudit/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ go 1.15
55
require (
66
github.com/alecthomas/jsonschema v0.0.0-20220216202328-9eeeec9d044b
77
github.com/falcosecurity/plugin-sdk-go v0.8.3
8+
github.com/fsnotify/fsnotify v1.9.0 // indirect
89
github.com/iancoleman/orderedmap v0.3.0 // indirect
910
github.com/valyala/fastjson v1.6.4
1011
)

plugins/k8saudit/go.sum

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
55
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
66
github.com/falcosecurity/plugin-sdk-go v0.8.3 h1:KsX7qt83dzC57qcNpZKaBrCjTXqpXgvxDcEXs6Z5sHI=
77
github.com/falcosecurity/plugin-sdk-go v0.8.3/go.mod h1:gEgxjvuopv5VF4wc8s0EHnmT9qrIKBtcJVBnRlEPU1A=
8+
github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k=
9+
github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0=
810
github.com/geraldcombs/fastjson v0.0.0-20250801170450-bf39244e60b8 h1:S2FAMWjJKPRR9fvtgYVWQ5joNsl0qQoRxmxYHKDDtx4=
911
github.com/geraldcombs/fastjson v0.0.0-20250801170450-bf39244e60b8/go.mod h1:CLCAqky6SMuOcxStkYQvblddUtoRxhYMGLrsQns1aXY=
1012
github.com/iancoleman/orderedmap v0.0.0-20190318233801-ac98e3ecb4b0/go.mod h1:N0Wam8K1arqPXNWjMo21EXnBPOPp36vB07FNRdD2geA=
@@ -25,6 +27,8 @@ github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2
2527
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
2628
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ=
2729
github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y=
30+
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
31+
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
2832
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
2933
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
3034
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

plugins/k8saudit/pkg/k8saudit/config.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ type PluginConfig struct {
2424
UseAsync bool `json:"useAsync" jsonschema:"title=Use async extraction,description=If true then async extraction optimization is enabled (Default: true),default=true"`
2525
MaxEventSize uint64 `json:"maxEventSize" jsonschema:"title=Maximum event size,description=Maximum size of single audit event (Default: 262144),default=262144"`
2626
WebhookMaxBatchSize uint64 `json:"webhookMaxBatchSize" jsonschema:"title=Maximum webhook request size,description=Maximum size of incoming webhook POST request bodies (Default: 12582912),default=12582912"`
27-
WatchPollIntervalMs uint64 `json:"watchPollIntervalMs" jsonschema:"title=Watch poll interval,description=Polling interval in milliseconds when watching a file with tail:// scheme (Default: 250),default=250"`
2827
}
2928

3029
// Resets sets the configuration to its default values
@@ -39,5 +38,4 @@ func (k *PluginConfig) Reset() {
3938
// The following values have been chosen by increasing by ~20% the default
4039
// values of the K8S docs
4140
k.WebhookMaxBatchSize = 12 * 1024 * 1024
42-
k.WatchPollIntervalMs = 250
4341
}

plugins/k8saudit/pkg/k8saudit/tail/tail_test.go renamed to plugins/k8saudit/pkg/k8saudit/filewatch/filewatch_test.go

Lines changed: 7 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ See the License for the specific language governing permissions and
1515
limitations under the License.
1616
*/
1717

18-
package tail_test
18+
package filewatch_test
1919

2020
import (
2121
"os"
@@ -32,30 +32,10 @@ const testAuditEvent = `{"kind":"Event","apiVersion":"audit.k8s.io/v1","level":"
3232
func newTestPlugin() *k8saudit.Plugin {
3333
p := &k8saudit.Plugin{}
3434
p.Config.Reset()
35-
p.Config.WatchPollIntervalMs = 50
3635
return p
3736
}
3837

39-
func TestOpenFileTail_ReadsExistingContent(t *testing.T) {
40-
tmpDir := t.TempDir()
41-
filePath := filepath.Join(tmpDir, "audit.log")
42-
43-
if err := os.WriteFile(filePath, []byte(testAuditEvent+"\n"), 0644); err != nil {
44-
t.Fatal(err)
45-
}
46-
47-
p := newTestPlugin()
48-
inst, err := p.OpenFileTail(filePath)
49-
if err != nil {
50-
t.Fatal(err)
51-
}
52-
defer inst.(sdk.Closer).Close()
53-
54-
// Allow time for initial read
55-
time.Sleep(100 * time.Millisecond)
56-
}
57-
58-
func TestOpenFileTail_DetectsNewContent(t *testing.T) {
38+
func TestOpenFileWatch_DetectsNewContent(t *testing.T) {
5939
tmpDir := t.TempDir()
6040
filePath := filepath.Join(tmpDir, "audit.log")
6141

@@ -64,13 +44,12 @@ func TestOpenFileTail_DetectsNewContent(t *testing.T) {
6444
}
6545

6646
p := newTestPlugin()
67-
inst, err := p.OpenFileTail(filePath)
47+
inst, err := p.OpenFileWatch(filePath)
6848
if err != nil {
6949
t.Fatal(err)
7050
}
7151
defer inst.(sdk.Closer).Close()
7252

73-
// Write new content after opening
7453
time.Sleep(100 * time.Millisecond)
7554
f, err := os.OpenFile(filePath, os.O_APPEND|os.O_WRONLY, 0644)
7655
if err != nil {
@@ -79,11 +58,10 @@ func TestOpenFileTail_DetectsNewContent(t *testing.T) {
7958
f.WriteString(testAuditEvent + "\n")
8059
f.Close()
8160

82-
// Allow time for polling to detect new content
8361
time.Sleep(200 * time.Millisecond)
8462
}
8563

86-
func TestOpenFileTail_HandlesRotation(t *testing.T) {
64+
func TestOpenFileWatch_HandlesRotation(t *testing.T) {
8765
tmpDir := t.TempDir()
8866
filePath := filepath.Join(tmpDir, "audit.log")
8967

@@ -92,51 +70,23 @@ func TestOpenFileTail_HandlesRotation(t *testing.T) {
9270
}
9371

9472
p := newTestPlugin()
95-
inst, err := p.OpenFileTail(filePath)
73+
inst, err := p.OpenFileWatch(filePath)
9674
if err != nil {
9775
t.Fatal(err)
9876
}
9977
defer inst.(sdk.Closer).Close()
10078

10179
time.Sleep(100 * time.Millisecond)
10280

103-
// Simulate rotation: remove and recreate with new content
10481
os.Remove(filePath)
10582
if err := os.WriteFile(filePath, []byte(testAuditEvent+"\n"), 0644); err != nil {
10683
t.Fatal(err)
10784
}
10885

109-
// Allow time for polling to detect rotation
110-
time.Sleep(200 * time.Millisecond)
111-
}
112-
113-
func TestOpenFileTail_HandlesTruncation(t *testing.T) {
114-
tmpDir := t.TempDir()
115-
filePath := filepath.Join(tmpDir, "audit.log")
116-
117-
if err := os.WriteFile(filePath, []byte(testAuditEvent+"\n"+testAuditEvent+"\n"), 0644); err != nil {
118-
t.Fatal(err)
119-
}
120-
121-
p := newTestPlugin()
122-
inst, err := p.OpenFileTail(filePath)
123-
if err != nil {
124-
t.Fatal(err)
125-
}
126-
defer inst.(sdk.Closer).Close()
127-
128-
time.Sleep(100 * time.Millisecond)
129-
130-
// Truncate the file
131-
if err := os.WriteFile(filePath, []byte(testAuditEvent+"\n"), 0644); err != nil {
132-
t.Fatal(err)
133-
}
134-
135-
// Allow time for polling to detect truncation
13686
time.Sleep(200 * time.Millisecond)
13787
}
13888

139-
func TestOpen_TailScheme(t *testing.T) {
89+
func TestOpen_FileScheme(t *testing.T) {
14090
tmpDir := t.TempDir()
14191
filePath := filepath.Join(tmpDir, "audit.log")
14292

@@ -145,7 +95,7 @@ func TestOpen_TailScheme(t *testing.T) {
14595
}
14696

14797
p := newTestPlugin()
148-
inst, err := p.Open("tail://" + filePath)
98+
inst, err := p.Open("file://" + filePath)
14999
if err != nil {
150100
t.Fatal(err)
151101
}

plugins/k8saudit/pkg/k8saudit/source.go

Lines changed: 84 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -26,23 +26,17 @@ import (
2626
"net/http"
2727
"net/url"
2828
"os"
29+
"path/filepath"
2930
"sort"
3031
"strings"
31-
"syscall"
3232
"time"
3333

3434
"github.com/falcosecurity/plugin-sdk-go/pkg/sdk"
3535
"github.com/falcosecurity/plugin-sdk-go/pkg/sdk/plugins/source"
36+
"github.com/fsnotify/fsnotify"
3637
"github.com/valyala/fastjson"
3738
)
3839

39-
func fileInode(info os.FileInfo) uint64 {
40-
if stat, ok := info.Sys().(*syscall.Stat_t); ok {
41-
return stat.Ino
42-
}
43-
return 0
44-
}
45-
4640
const (
4741
webServerShutdownTimeoutSecs = 5
4842
webServerEventChanBufSize = 50
@@ -59,8 +53,8 @@ func (k *Plugin) Open(params string) (source.Instance, error) {
5953
return k.OpenWebServer(u.Host, u.Path, false)
6054
case "https":
6155
return k.OpenWebServer(u.Host, u.Path, true)
62-
case "tail":
63-
return k.OpenFileTail(u.Path)
56+
case "file":
57+
return k.OpenFileWatch(u.Path)
6458
case "": // by default, fallback to opening a filepath
6559
trimmed := strings.TrimSpace(params)
6660

@@ -135,72 +129,112 @@ func (k *Plugin) OpenReader(r io.ReadCloser) (source.Instance, error) {
135129
source.WithInstanceEventSize(uint32(k.Config.MaxEventSize)))
136130
}
137131

138-
// OpenFileTail opens a source.Instance that continuously watches a file for
139-
// new K8S Audit Events, similar to "tail -f". It handles log rotation by
140-
// detecting file truncation or inode changes.
141-
func (k *Plugin) OpenFileTail(path string) (source.Instance, error) {
132+
// OpenFileWatch opens a source.Instance that continuously watches a file for
133+
// new K8S Audit Events using fsnotify. It watches the parent directory (as
134+
// recommended by fsnotify) to handle atomic file replacements and log rotation.
135+
func (k *Plugin) OpenFileWatch(path string) (source.Instance, error) {
136+
absPath, err := filepath.Abs(path)
137+
if err != nil {
138+
return nil, fmt.Errorf("failed to get absolute path: %w", err)
139+
}
140+
dir := filepath.Dir(absPath)
141+
142+
watcher, err := fsnotify.NewWatcher()
143+
if err != nil {
144+
return nil, fmt.Errorf("failed to create file watcher: %w", err)
145+
}
146+
142147
ctx, cancelCtx := context.WithCancel(context.Background())
143148
evtC := make(chan source.PushEvent)
144149

145150
go func() {
146151
defer close(evtC)
152+
defer watcher.Close()
153+
147154
var parser fastjson.Parser
155+
var file *os.File
148156
var offset int64
149-
var lastInode uint64
150-
151-
pollInterval := time.Duration(k.Config.WatchPollIntervalMs) * time.Millisecond
152-
153-
for {
154-
select {
155-
case <-ctx.Done():
156-
return
157-
default:
158-
}
159157

160-
file, err := os.Open(path)
161-
if err != nil {
162-
select {
163-
case <-ctx.Done():
164-
return
165-
case <-time.After(pollInterval):
166-
continue
167-
}
158+
openFile := func(seekEnd bool) bool {
159+
if file != nil {
160+
file.Close()
161+
file = nil
168162
}
169-
170-
info, err := file.Stat()
163+
f, err := os.Open(absPath)
171164
if err != nil {
172-
file.Close()
173-
continue
165+
return false
174166
}
175-
176-
inode := fileInode(info)
177-
if lastInode != 0 && inode != lastInode {
178-
offset = 0 // file rotated (new inode)
179-
} else if info.Size() < offset {
180-
offset = 0 // file truncated
167+
file = f
168+
if seekEnd {
169+
offset, _ = file.Seek(0, io.SeekEnd)
170+
} else {
171+
offset = 0
181172
}
182-
lastInode = inode
173+
return true
174+
}
183175

184-
if offset > 0 {
185-
file.Seek(offset, io.SeekStart)
176+
readNewLines := func() {
177+
if file == nil {
178+
return
186179
}
187-
180+
file.Seek(offset, io.SeekStart)
188181
scanner := bufio.NewScanner(file)
189182
for scanner.Scan() {
190183
line := scanner.Text()
184+
offset += int64(len(line)) + 1
191185
if len(line) > 0 {
192186
k.parseAuditEventsAndPush(&parser, []byte(line), evtC)
193187
}
194188
}
189+
}
190+
191+
openFile(true)
195192

196-
pos, _ := file.Seek(0, io.SeekCurrent)
197-
offset = pos
198-
file.Close()
193+
if err := watcher.Add(dir); err != nil {
194+
if file != nil {
195+
file.Close()
196+
}
197+
evtC <- source.PushEvent{Err: err}
198+
return
199+
}
199200

201+
for {
200202
select {
201203
case <-ctx.Done():
204+
if file != nil {
205+
file.Close()
206+
}
202207
return
203-
case <-time.After(pollInterval):
208+
case event, ok := <-watcher.Events:
209+
if !ok {
210+
if file != nil {
211+
file.Close()
212+
}
213+
return
214+
}
215+
if event.Name != absPath {
216+
continue
217+
}
218+
if event.Op&fsnotify.Write == fsnotify.Write {
219+
readNewLines()
220+
}
221+
if event.Op&fsnotify.Create == fsnotify.Create {
222+
openFile(false)
223+
readNewLines()
224+
}
225+
if event.Op&(fsnotify.Rename|fsnotify.Remove) != 0 {
226+
if file != nil {
227+
file.Close()
228+
file = nil
229+
}
230+
}
231+
case _, ok := <-watcher.Errors:
232+
if !ok {
233+
if file != nil {
234+
file.Close()
235+
}
236+
return
237+
}
204238
}
205239
}
206240
}()

0 commit comments

Comments
 (0)