Skip to content

Commit 1c135e3

Browse files
FedeDPpoiana
authored andcommitted
chore(plugins/container): let async_ctx own the fetcher channel.
Signed-off-by: Federico Di Pierro <nierro92@gmail.com>
1 parent f212d50 commit 1c135e3

File tree

4 files changed

+33
-27
lines changed

4 files changed

+33
-27
lines changed

plugins/container/go-worker/pkg/container/fetcher.go

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,35 +7,31 @@ import (
77
)
88

99
/*
10-
Fetcher is a fake engine that listens on fetcherChan for published containerIDs.
10+
Fetcher is a fake engine that listens on a channel for published containerIDs.
1111
Everytime a containerID is published on the channel, the fetcher engine loops
1212
over all enabled engines and tries to get info about the container,
1313
until it succeeds and publish an event to the output channel.
1414
FetcherChan requests are published through a CGO exposed API: AskForContainerInfo(), in worker_api.
1515
*/
1616

17-
var fetcherChan chan string
18-
19-
func GetFetcherChan() chan<- string {
20-
return fetcherChan
21-
}
22-
2317
type fetcher struct {
24-
getters []getter
25-
ctx context.Context
18+
getters []getter
19+
ctx context.Context
20+
fetcherChan <-chan string
2621
}
2722

2823
// NewFetcherEngine returns a fetcher engine.
2924
// The fetcher engine is responsible to allow us to get() single container
3025
// trying all container engines enabled.
31-
func NewFetcherEngine(_ context.Context, containerEngines []Engine) Engine {
26+
func NewFetcherEngine(_ context.Context, fetcherChan <-chan string, containerEngines []Engine) Engine {
3227
f := fetcher{
3328
getters: make([]getter, len(containerEngines)),
3429
// Since podman relies upon context to store
3530
// connection-related info,
3631
// we need a unique context for fetcher
3732
// to avoid tampering with real podman engine context.
38-
ctx: context.Background(),
33+
ctx: context.Background(),
34+
fetcherChan: fetcherChan,
3935
}
4036
for i, engine := range containerEngines {
4137
copyEngine, ok := engine.(copier)
@@ -67,19 +63,16 @@ func (f *fetcher) List(_ context.Context) ([]event.Event, error) {
6763
func (f *fetcher) Listen(ctx context.Context, wg *sync.WaitGroup) (<-chan event.Event, error) {
6864
outCh := make(chan event.Event)
6965
wg.Add(1)
70-
fetcherChan = make(chan string)
7166
go func() {
7267
defer func() {
7368
close(outCh)
74-
close(fetcherChan)
75-
fetcherChan = nil
7669
wg.Done()
7770
}()
7871
for {
7972
select {
8073
case <-ctx.Done():
8174
return
82-
case containerId := <-fetcherChan:
75+
case containerId := <-f.fetcherChan:
8376
for _, e := range f.getters {
8477
evt, _ := e.get(f.ctx, containerId)
8578
if evt != nil {

plugins/container/go-worker/pkg/container/fetcher_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,9 @@ func TestCRIFetcher(t *testing.T) {
3232
func testFetcher(t *testing.T, containerEngine Engine, containerId string, expectedEvent event.Event) {
3333
// Create the fetcher engine with the docker engine as the only container engine
3434
containerEngines := []Engine{containerEngine}
35-
f := NewFetcherEngine(context.Background(), containerEngines)
35+
fetchCh := make(chan string)
36+
assert.NotNil(t, fetchCh)
37+
f := NewFetcherEngine(context.Background(), fetchCh, containerEngines)
3638
assert.NotNil(t, f)
3739

3840
// Check that fetcher is able to fetch the container
@@ -47,11 +49,9 @@ func testFetcher(t *testing.T, containerEngine Engine, containerId string, expec
4749
assert.NoError(t, err)
4850

4951
// Send the container ID to the fetcher channel to request its info to be loaded
50-
ch := GetFetcherChan()
51-
assert.NotNil(t, ch)
5252
go func() {
5353
time.Sleep(1 * time.Second)
54-
ch <- containerId
54+
fetchCh <- containerId
5555
}()
5656

5757
evt := waitOnChannelOrTimeout(t, listCh)

plugins/container/go-worker/worker_api.go

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ type PluginCtx struct {
2525
ctxCancel context.CancelFunc
2626
stringBuffer ptr.StringBuffer
2727
pinner runtime.Pinner
28+
fetchCh chan string
2829
}
2930

3031
//export StartWorker
@@ -79,9 +80,12 @@ func StartWorker(cb C.async_cb, initCfg *C.cchar_t, enabledSocks **C.cchar_t) un
7980
}
8081
}
8182
}
83+
84+
pluginCtx.fetchCh = make(chan string)
85+
8286
// Always append the dummy engine that is required to
8387
// be able to fetch container infos on the fly given other enabled engines.
84-
containerEngines = append(containerEngines, container.NewFetcherEngine(ctx, containerEngines))
88+
containerEngines = append(containerEngines, container.NewFetcherEngine(ctx, pluginCtx.fetchCh, containerEngines))
8589

8690
// Store json of attached sockets in `enabledSocks`
8791
bytes, _ := json.Marshal(enabledEngines)
@@ -106,16 +110,20 @@ func StopWorker(pCtx unsafe.Pointer) {
106110
pluginCtx.ctxCancel()
107111
pluginCtx.wg.Wait()
108112
pluginCtx.stringBuffer.Free()
113+
close(pluginCtx.fetchCh)
114+
pluginCtx.fetchCh = nil
109115

110116
pluginCtx.pinner.Unpin()
111117
h.Delete()
112118
}
113119

114120
//export AskForContainerInfo
115-
func AskForContainerInfo(containerId *C.cchar_t) {
121+
func AskForContainerInfo(pCtx unsafe.Pointer, containerId *C.cchar_t) {
122+
h := (*cgo.Handle)(pCtx)
123+
pluginCtx := h.Value().(*PluginCtx)
124+
116125
containerID := C.GoString(containerId)
117-
ch := container.GetFetcherChan()
118-
if ch != nil {
119-
ch <- containerID
126+
if pluginCtx.fetchCh != nil {
127+
pluginCtx.fetchCh <- containerID
120128
}
121129
}

plugins/container/src/plugin.cpp

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -372,12 +372,17 @@ void my_plugin::on_new_process(const falcosecurity::table_entry& thread_entry,
372372
falcosecurity::_internal::SS_PLUGIN_LOG_SEV_DEBUG);
373373
#ifdef _HAS_ASYNC
374374
// Check if already asked
375-
if(m_asked_containers.find(container_id) ==
376-
m_asked_containers.end())
375+
if(m_async_ctx != nullptr &&
376+
m_asked_containers.find(container_id) ==
377+
m_asked_containers.end())
377378
{
379+
m_logger.log(fmt::format("asking the plugin to fetch info for "
380+
"container {}",
381+
container_id),
382+
falcosecurity::_internal::SS_PLUGIN_LOG_SEV_DEBUG);
378383
m_asked_containers.insert(container_id);
379384
// Implemented by GO worker.go
380-
AskForContainerInfo(container_id.c_str());
385+
AskForContainerInfo(m_async_ctx, container_id.c_str());
381386
}
382387
#endif
383388
}

0 commit comments

Comments
 (0)