Skip to content
Merged
27 changes: 25 additions & 2 deletions plugins/container/go-worker/pkg/container/cri.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,14 +444,28 @@ func (c *criEngine) List(ctx context.Context) ([]event.Event, error) {
return evts, nil
}

// Set up container created event loop by call to GetContainerEvents of the criEngine client
// In case events have been disabled in the criEngine,
// an error will be captured and passed to the caller
func (c *criEngine) Listen(ctx context.Context, wg *sync.WaitGroup) (<-chan event.Event, error) {
containerEventsCh := make(chan *v1.ContainerEventResponse)
containerEventsErrorCh := make(chan error)
wg.Add(1)
go func() {
defer close(containerEventsCh)
defer close(containerEventsErrorCh)
defer wg.Done()
_ = c.client.GetContainerEvents(ctx, containerEventsCh, nil)
containerEventsErrorCh <- c.client.GetContainerEvents(ctx, containerEventsCh, nil)
}()

// Catch error on initialization containerEventsErrorCh
select {
case err := <-containerEventsErrorCh:
return nil, err
case <-time.After(containerEventsErrorTimeout):
break
}

outCh := make(chan event.Event)
wg.Add(1)
go func() {
Expand All @@ -461,7 +475,16 @@ func (c *criEngine) Listen(ctx context.Context, wg *sync.WaitGroup) (<-chan even
select {
case <-ctx.Done():
return
case evt := <-containerEventsCh:
case _, ok := <- containerEventsErrorCh:
if !ok {
// containerEventsErrorCh has been closed - block further reads from channel
containerEventsErrorCh = nil
}
case evt, ok := <-containerEventsCh:
if !ok {
// containerEventsCh has been closed - block further reads from channel
containerEventsCh = nil
}
if evt == nil {
// Nothing to do for nil event
break
Expand Down
2 changes: 2 additions & 0 deletions plugins/container/go-worker/pkg/container/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ const (
typeCri engineType = "cri"
typeCrio engineType = "cri-o"
typeContainerd engineType = "containerd"

containerEventsErrorTimeout = 10 * time.Millisecond
)

type engineType string
Expand Down
31 changes: 29 additions & 2 deletions plugins/container/go-worker/pkg/container/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"github.com/falcosecurity/plugins/plugins/container/go-worker/pkg/event"
"sync"
"time"
)

/*
Expand All @@ -17,13 +18,13 @@ FetcherChan requests are published through a CGO exposed API: AskForContainerInf
type fetcher struct {
getters []getter
ctx context.Context
fetcherChan <-chan string
fetcherChan chan string
}

// NewFetcherEngine returns a fetcher engine.
// The fetcher engine is responsible to allow us to get() single container
// trying all container engines enabled.
func NewFetcherEngine(_ context.Context, fetcherChan <-chan string, containerEngines []Engine) Engine {
func NewFetcherEngine(_ context.Context, fetcherChan chan string, containerEngines []Engine) Engine {
f := fetcher{
getters: make([]getter, len(containerEngines)),
// Since podman relies upon context to store
Expand Down Expand Up @@ -60,26 +61,52 @@ func (f *fetcher) List(_ context.Context) ([]event.Event, error) {
panic("do not call")
}

// Everytime a containerID is published on the fetcher channel, the fetcher engine loops
// over all enabled engines and tries to get info about the container.
// In case the container info is missing, due to a timing issue of the underlying engines
// a retry is set up every containerFetchRetryInterval until containerFetchRetryTimeout is reached.
// On success, publish event on output channel.
func (f *fetcher) Listen(ctx context.Context, wg *sync.WaitGroup) (<-chan event.Event, error) {
const containerFetchRetryInterval = 30 * time.Millisecond
const containerFetchRetryTimeout = 150 * time.Millisecond
outCh := make(chan event.Event)
wg.Add(1)
go func() {
defer func() {
close(outCh)
wg.Done()
}()
containerFirstSeen := make(map[string]time.Time)
for {
select {
case <-ctx.Done():
return
case containerId := <-f.fetcherChan:
found := false
now := time.Now()
if containerRequestTime, exists := containerFirstSeen[containerId]; exists {
if now.Sub(containerRequestTime) > containerFetchRetryTimeout {
delete(containerFirstSeen, containerId)
break
}
} else {
containerFirstSeen[containerId] = now
}
for _, e := range f.getters {
evt, _ := e.get(f.ctx, containerId)
if evt != nil {
outCh <- *evt
found = true
delete(containerFirstSeen, containerId)
break
}
}
if !found {
go func() {
time.Sleep(containerFetchRetryInterval)
f.fetcherChan <- containerId
}()
}
}
}
}()
Expand Down
28 changes: 26 additions & 2 deletions plugins/container/go-worker/pkg/container/podman.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"strconv"
"strings"
"sync"
"time"
)

func init() {
Expand Down Expand Up @@ -252,6 +253,8 @@ func (pc *podmanEngine) List(_ context.Context) ([]event.Event, error) {
return evts, nil
}

// Set up container created event listener by call to system.Events
// In case events have been disabled in the podmanEngine an error will be captured and passed to the caller
func (pc *podmanEngine) Listen(ctx context.Context, wg *sync.WaitGroup) (<-chan event.Event, error) {
stream := true

Expand All @@ -268,17 +271,29 @@ func (pc *podmanEngine) Listen(ctx context.Context, wg *sync.WaitGroup) (<-chan
filters["event"] = append(filters["event"], string(events.ActionRemove))

evChn := make(chan types.Event)
evErrorChn := make(chan error)
cancelChan := make(chan bool)
wg.Add(1)
// producers
go func(ch chan types.Event) {
defer close(evErrorChn)
defer wg.Done()
_ = system.Events(pc.pCtx, ch, cancelChan, &system.EventsOptions{
evErrorChn <- system.Events(pc.pCtx, ch, cancelChan, &system.EventsOptions{
Filters: filters,
Stream: &stream,
})
}(evChn)

// Catch error on initialization of evChn
select {
case err := <-evErrorChn:
if err != nil {
return nil, err
}
case <-time.After(containerEventsErrorTimeout):
break
}

outCh := make(chan event.Event)
wg.Add(1)
go func() {
Expand All @@ -292,11 +307,20 @@ func (pc *podmanEngine) Listen(ctx context.Context, wg *sync.WaitGroup) (<-chan
select {
case <-ctx.Done():
return
case ev := <-evChn:
case _, ok := <-evErrorChn:
if !ok {
// evErrorChn has been closed - block further reads from channel
evErrorChn = nil
}
case ev, ok := <-evChn:
var (
ctr *define.InspectContainerData
err error
)
if !ok {
// evChn has been closed - block further reads from channel
evChn = nil
}
switch ev.Action {
case events.ActionCreate, events.ActionStart:
ctr, err = containers.Inspect(pc.pCtx, ev.Actor.ID, &containers.InspectOptions{Size: &size})
Expand Down
15 changes: 12 additions & 3 deletions plugins/container/go-worker/worker_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func StartWorker(cb C.async_cb, initCfg *C.cchar_t, enabledSocks **C.cchar_t) un
pluginCtx PluginCtx
ctx context.Context
)
const fetchChSize = 100
ctx, pluginCtx.ctxCancel = context.WithCancel(context.Background())

// See https://github.com/enobufs/go-calls-c-pointer/blob/master/counter_api.go
Expand Down Expand Up @@ -81,7 +82,7 @@ func StartWorker(cb C.async_cb, initCfg *C.cchar_t, enabledSocks **C.cchar_t) un
}
}

pluginCtx.fetchCh = make(chan string)
pluginCtx.fetchCh = make(chan string, fetchChSize)

// Always append the dummy engine that is required to
// be able to fetch container infos on the fly given other enabled engines.
Expand Down Expand Up @@ -118,12 +119,20 @@ func StopWorker(pCtx unsafe.Pointer) {
}

//export AskForContainerInfo
func AskForContainerInfo(pCtx unsafe.Pointer, containerId *C.cchar_t) {
func AskForContainerInfo(pCtx unsafe.Pointer, containerId *C.cchar_t) bool {
h := (*cgo.Handle)(pCtx)
pluginCtx := h.Value().(*PluginCtx)

containerID := C.GoString(containerId)
if pluginCtx.fetchCh != nil {
pluginCtx.fetchCh <- containerID
select {
case pluginCtx.fetchCh <- containerID:
return true
default:
return false
}
}
// In case the fetch channel is nil a retry from falco
// does not make sense, report the containerId as handled
return true
}
15 changes: 13 additions & 2 deletions plugins/container/src/plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -382,9 +382,20 @@ void my_plugin::on_new_process(const falcosecurity::table_entry& thread_entry,
"container {}",
container_id),
falcosecurity::_internal::SS_PLUGIN_LOG_SEV_DEBUG);
m_asked_containers.insert(container_id);
// Implemented by GO worker.go
AskForContainerInfo(m_async_ctx, container_id.c_str());
if(AskForContainerInfo(m_async_ctx, container_id.c_str()))
{
m_asked_containers.insert(container_id);
}
else
{
m_logger.log(
fmt::format("failed to ask the plugin to fetch "
"info for "
"container {}",
container_id),
falcosecurity::_internal::SS_PLUGIN_LOG_SEV_DEBUG);
}
}
#endif
}
Expand Down
Loading