From 149a59ca7a3895c78c3fb610b2d390927b16ccbc Mon Sep 17 00:00:00 2001 From: Flosch62 Date: Sat, 18 Oct 2025 16:11:00 +0200 Subject: [PATCH 01/10] containerlab events --- cmd/events.go | 747 +++++++++++++++++++++++++++++++++++++++++++++ cmd/options.go | 8 + cmd/root.go | 1 + docs/cmd/events.md | 50 +++ mkdocs.yml | 1 + 5 files changed, 807 insertions(+) create mode 100644 cmd/events.go create mode 100644 docs/cmd/events.md diff --git a/cmd/events.go b/cmd/events.go new file mode 100644 index 000000000..9fd2946fb --- /dev/null +++ b/cmd/events.go @@ -0,0 +1,747 @@ +package cmd + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "sort" + "strconv" + "strings" + "sync" + "time" + + "github.com/charmbracelet/log" + dockerTypes "github.com/docker/docker/api/types" + dockerEvents "github.com/docker/docker/api/types/events" + "github.com/docker/docker/api/types/filters" + dockerClient "github.com/docker/docker/client" + "github.com/spf13/cobra" + clabconstants "github.com/srl-labs/containerlab/constants" + clabcore "github.com/srl-labs/containerlab/core" + clabruntime "github.com/srl-labs/containerlab/runtime" + clabruntimedocker "github.com/srl-labs/containerlab/runtime/docker" + clabutils "github.com/srl-labs/containerlab/utils" + "github.com/vishvananda/netlink" + "github.com/vishvananda/netns" + "golang.org/x/sys/unix" +) + +func eventsCmd(o *Options) (*cobra.Command, error) { + c := &cobra.Command{ + Use: "events", + Short: "stream lab lifecycle and interface events", + Long: "stream docker runtime events as well as container interface updates for all running labs\n" + + "reference: https://containerlab.dev/cmd/events/", + RunE: func(cobraCmd *cobra.Command, _ []string) error { + return eventsFn(cobraCmd, o) + }, + } + + c.Flags().StringVarP( + &o.Events.Format, + "format", + "f", + o.Events.Format, + "output format. One of [plain, json]", + ) + + return c, nil +} + +func eventsFn(cobraCmd *cobra.Command, o *Options) error { + if err := clabutils.CheckAndGetRootPrivs(); err != nil { + return err + } + + ctx, cancel := context.WithCancel(cobraCmd.Context()) + defer cancel() + + c, err := clabcore.NewContainerLab(o.ToClabOptions()...) + if err != nil { + return err + } + + runtime, ok := c.Runtimes[o.Global.Runtime] + if !ok { + return fmt.Errorf("runtime %q is not initialized", o.Global.Runtime) + } + + dockerRuntime, ok := runtime.(*clabruntimedocker.DockerRuntime) + if !ok { + return fmt.Errorf("events command currently supports only the %s runtime", clabruntimedocker.RuntimeName) + } + + format := strings.TrimSpace(strings.ToLower(o.Events.Format)) + if format == "" { + format = "plain" + } + + var printer func(aggregatedEvent) + switch format { + case "plain": + printer = printAggregatedEvent + case "json": + printer = printAggregatedEventJSON + default: + return fmt.Errorf("output format %q is not supported, use 'plain' or 'json'", o.Events.Format) + } + + eventCh := make(chan aggregatedEvent, 128) + errCh := make(chan error, 1) + registry := newNetlinkRegistry(eventCh) + + containers, err := c.ListContainers(ctx, clabcore.WithListclabLabelExists()) + if err != nil { + return fmt.Errorf("failed to list containers: %w", err) + } + + for idx := range containers { + container := containers[idx] + if !isRunningContainer(&container) { + continue + } + + registry.Start(ctx, &container) + } + + go streamDockerEvents(ctx, dockerRuntime.Client, dockerRuntime, registry, eventCh, errCh) + + for { + select { + case ev := <-eventCh: + printer(ev) + case err := <-errCh: + if err == nil || errors.Is(err, context.Canceled) { + return nil + } + + return err + case <-ctx.Done(): + return nil + } + } +} + +type aggregatedEvent struct { + Timestamp time.Time `json:"timestamp"` + Type string `json:"type"` + Action string `json:"action"` + ActorID string `json:"actor_id"` + ActorName string `json:"actor_name"` + ActorFullID string `json:"actor_full_id"` + Attributes map[string]string `json:"attributes,omitempty"` +} + +func dockerMessageToEvent(msg dockerEvents.Message) aggregatedEvent { + ts := time.Unix(0, msg.TimeNano) + if ts.IsZero() { + ts = time.Unix(msg.Time, 0) + } + if ts.IsZero() { + ts = time.Now() + } + + attributes := make(map[string]string, len(msg.Actor.Attributes)+1) + for k, v := range msg.Actor.Attributes { + if v == "" { + continue + } + + attributes[k] = v + } + + if msg.Scope != "" { + attributes["scope"] = msg.Scope + } + + return aggregatedEvent{ + Timestamp: ts, + Type: string(msg.Type), + Action: string(msg.Action), + ActorID: shortID(msg.Actor.ID), + ActorName: attributes["name"], + ActorFullID: msg.Actor.ID, + Attributes: attributes, + } +} + +func printAggregatedEvent(ev aggregatedEvent) { + ts := ev.Timestamp + if ts.IsZero() { + ts = time.Now() + } + ts = ts.UTC() + + actor := ev.ActorID + if actor == "" { + actor = ev.ActorName + } + if actor == "" { + actor = "-" + } + + attrs := mergedEventAttributes(ev) + keys := make([]string, 0, len(attrs)) + for k := range attrs { + keys = append(keys, k) + } + sort.Strings(keys) + + attrParts := make([]string, 0, len(keys)) + for _, k := range keys { + attrParts = append(attrParts, fmt.Sprintf("%s=%s", k, attrs[k])) + } + + suffix := "" + if len(attrParts) > 0 { + suffix = " (" + strings.Join(attrParts, ", ") + ")" + } + + fmt.Printf("%s %s %s %s%s\n", ts.Format(time.RFC3339Nano), ev.Type, ev.Action, actor, suffix) +} + +func printAggregatedEventJSON(ev aggregatedEvent) { + evCopy := ev + evCopy.Attributes = mergedEventAttributes(ev) + + b, err := json.Marshal(evCopy) + if err != nil { + log.Debugf("failed to marshal event to json: %v", err) + + return + } + + fmt.Println(string(b)) +} + +func mergedEventAttributes(ev aggregatedEvent) map[string]string { + if len(ev.Attributes) == 0 && ev.ActorName == "" && ev.ActorFullID == "" { + return nil + } + + attrs := make(map[string]string, len(ev.Attributes)+2) + for k, v := range ev.Attributes { + if v == "" { + continue + } + + attrs[k] = v + } + + if ev.ActorName != "" { + attrs["name"] = ev.ActorName + } + + if ev.ActorFullID != "" { + attrs["id"] = ev.ActorFullID + } + + if len(attrs) == 0 { + return nil + } + + return attrs +} + +func streamDockerEvents( + ctx context.Context, + client *dockerClient.Client, + runtime clabruntime.ContainerRuntime, + registry *netlinkRegistry, + eventSink chan<- aggregatedEvent, + errSink chan<- error, +) { + filtersArgs := filters.NewArgs() + filtersArgs.Add("label", clabconstants.Containerlab) + + messages, errs := client.Events(ctx, dockerTypes.EventsOptions{Filters: filtersArgs}) + + for { + select { + case <-ctx.Done(): + errSink <- nil + + return + case err, ok := <-errs: + if !ok { + errSink <- nil + + return + } + + if err != nil && !errors.Is(err, context.Canceled) { + errSink <- err + + return + } + case msg, ok := <-messages: + if !ok { + errSink <- nil + + return + } + + registry.HandleDockerMessage(ctx, runtime, msg) + eventSink <- dockerMessageToEvent(msg) + } + } +} + +type netlinkRegistry struct { + mu sync.Mutex + watchers map[string]*netlinkWatcher + events chan<- aggregatedEvent +} + +func newNetlinkRegistry(events chan<- aggregatedEvent) *netlinkRegistry { + return &netlinkRegistry{ + watchers: make(map[string]*netlinkWatcher), + events: events, + } +} + +func (r *netlinkRegistry) Start(ctx context.Context, container *clabruntime.GenericContainer) { + clone := cloneContainer(container) + if clone == nil { + return + } + + id := clone.ID + if id == "" { + id = clone.ShortID + } + + if id == "" { + return + } + + r.mu.Lock() + if _, exists := r.watchers[id]; exists { + r.mu.Unlock() + + return + } + + watcherCtx, cancel := context.WithCancel(ctx) + watcher := &netlinkWatcher{ + container: clone, + cancel: cancel, + done: make(chan struct{}), + } + + r.watchers[id] = watcher + r.mu.Unlock() + + go watcher.run(watcherCtx, r) +} + +func (r *netlinkRegistry) Stop(id string) { + if id == "" { + return + } + + r.mu.Lock() + watcher, ok := r.watchers[id] + if ok { + delete(r.watchers, id) + } + r.mu.Unlock() + + if !ok { + return + } + + watcher.cancel() + <-watcher.done +} + +func (r *netlinkRegistry) remove(id string, watcher *netlinkWatcher) { + if id == "" { + return + } + + r.mu.Lock() + defer r.mu.Unlock() + + if current, ok := r.watchers[id]; ok && current == watcher { + delete(r.watchers, id) + } +} + +func (r *netlinkRegistry) HandleDockerMessage( + ctx context.Context, + runtime clabruntime.ContainerRuntime, + msg dockerEvents.Message, +) { + if msg.Type != dockerEvents.ContainerEventType { + return + } + + switch msg.Action { + case dockerEvents.ActionStart, dockerEvents.ActionUnPause, dockerEvents.ActionRestart: + container := containerFromDockerMessage(runtime, msg) + if container != nil { + r.Start(ctx, container) + } + case dockerEvents.ActionDie, dockerEvents.ActionStop, dockerEvents.ActionDestroy, dockerEvents.ActionKill: + id := msg.Actor.ID + if id == "" { + id = msg.ID + } + + r.Stop(id) + } +} + +type netlinkWatcher struct { + container *clabruntime.GenericContainer + cancel context.CancelFunc + done chan struct{} +} + +func (w *netlinkWatcher) run(ctx context.Context, registry *netlinkRegistry) { + defer close(w.done) + if w.container == nil { + return + } + + defer registry.remove(w.container.ID, w) + + containerName := firstContainerName(w.container) + if w.container.Runtime == nil { + log.Debugf("container %s has no runtime, skipping netlink watcher", containerName) + + return + } + + nsPath, err := waitForNamespacePath(ctx, w.container.Runtime, w.container.ID) + if err != nil || nsPath == "" { + log.Debugf("failed to resolve netns for container %s: %v", containerName, err) + + return + } + + nsHandle, err := netns.GetFromPath(nsPath) + if err != nil { + log.Debugf("failed to open netns for container %s: %v", containerName, err) + + return + } + defer nsHandle.Close() + + states, err := snapshotInterfaces(nsHandle) + if err != nil { + log.Debugf("failed to snapshot interfaces for container %s: %v", containerName, err) + states = make(map[int]ifaceSnapshot) + } + + updates := make(chan netlink.LinkUpdate, 32) + done := make(chan struct{}) + opts := netlink.LinkSubscribeOptions{Namespace: &nsHandle} + + if err := netlink.LinkSubscribeWithOptions(updates, done, opts); err != nil { + log.Debugf("failed to subscribe to netlink updates for container %s: %v", containerName, err) + + return + } + + for { + select { + case <-ctx.Done(): + close(done) + + return + case update, ok := <-updates: + if !ok { + return + } + + w.processUpdate(states, update, registry) + } + } +} + +func (w *netlinkWatcher) processUpdate( + states map[int]ifaceSnapshot, + update netlink.LinkUpdate, + registry *netlinkRegistry, +) { + if update.Link == nil { + return + } + + attrs := update.Link.Attrs() + if attrs == nil { + return + } + + snapshot := snapshotFromLink(update.Link) + previous, exists := states[snapshot.Index] + + switch update.Header.Type { + case unix.RTM_DELLINK: + if exists { + snapshot = previous + } + + delete(states, snapshot.Index) + registry.emitInterfaceEvent(w.container, "delete", snapshot) + case unix.RTM_NEWLINK: + if exists && snapshot.equal(previous) { + return + } + + action := "create" + if exists { + action = "update" + } + + states[snapshot.Index] = snapshot + registry.emitInterfaceEvent(w.container, action, snapshot) + } +} + +func (r *netlinkRegistry) emitInterfaceEvent( + container *clabruntime.GenericContainer, + action string, + snapshot ifaceSnapshot, +) { + if container == nil { + return + } + + attributes := map[string]string{ + "ifname": snapshot.Name, + "index": strconv.Itoa(snapshot.Index), + "mtu": strconv.Itoa(snapshot.MTU), + "state": snapshot.OperState, + "type": snapshot.Type, + "origin": "netlink", + } + + if snapshot.Alias != "" { + attributes["alias"] = snapshot.Alias + } + + if snapshot.MAC != "" { + attributes["mac"] = snapshot.MAC + } + + if lab := containerLabel(container); lab != "" { + attributes["lab"] = lab + } + + if name := firstContainerName(container); name != "" { + attributes["name"] = name + } + + r.events <- aggregatedEvent{ + Timestamp: time.Now(), + Type: "interface", + Action: action, + ActorID: container.ShortID, + ActorName: firstContainerName(container), + ActorFullID: container.ID, + Attributes: attributes, + } +} + +func waitForNamespacePath( + ctx context.Context, + runtime clabruntime.ContainerRuntime, + containerID string, +) (string, error) { + const ( + attempts = 5 + retryDelay = 200 * time.Millisecond + ) + + var lastErr error + + for i := 0; i < attempts; i++ { + nsPath, err := runtime.GetNSPath(ctx, containerID) + if err == nil && nsPath != "" { + return nsPath, nil + } + + if err != nil { + lastErr = err + } + + select { + case <-time.After(retryDelay): + case <-ctx.Done(): + return "", ctx.Err() + } + } + + if lastErr != nil { + return "", lastErr + } + + return "", fmt.Errorf("namespace path not ready for container %s", containerID) +} + +func snapshotInterfaces(nsHandle netns.NsHandle) (map[int]ifaceSnapshot, error) { + netHandle, err := netlink.NewHandleAt(nsHandle) + if err != nil { + return nil, fmt.Errorf("unable to enter namespace: %w", err) + } + defer netHandle.Close() + + links, err := netHandle.LinkList() + if err != nil { + return nil, fmt.Errorf("unable to list links: %w", err) + } + + states := make(map[int]ifaceSnapshot, len(links)) + for _, link := range links { + snapshot := snapshotFromLink(link) + states[snapshot.Index] = snapshot + } + + return states, nil +} + +func snapshotFromLink(link netlink.Link) ifaceSnapshot { + attrs := link.Attrs() + + snapshot := ifaceSnapshot{ + Type: link.Type(), + } + + if attrs != nil { + snapshot.Index = attrs.Index + snapshot.Name = attrs.Name + snapshot.Alias = attrs.Alias + snapshot.MTU = attrs.MTU + if len(attrs.HardwareAddr) > 0 { + snapshot.MAC = attrs.HardwareAddr.String() + } + snapshot.OperState = attrs.OperState.String() + } + + return snapshot +} + +type ifaceSnapshot struct { + Index int + Name string + Alias string + MTU int + MAC string + OperState string + Type string +} + +func (s ifaceSnapshot) equal(other ifaceSnapshot) bool { + return s.Index == other.Index && + s.Name == other.Name && + s.Alias == other.Alias && + s.MTU == other.MTU && + s.MAC == other.MAC && + s.OperState == other.OperState && + s.Type == other.Type +} + +func isRunningContainer(container *clabruntime.GenericContainer) bool { + if container == nil { + return false + } + + return strings.EqualFold(container.State, "running") +} + +func cloneContainer(container *clabruntime.GenericContainer) *clabruntime.GenericContainer { + if container == nil { + return nil + } + + clone := &clabruntime.GenericContainer{ + Names: append([]string{}, container.Names...), + ID: container.ID, + ShortID: container.ShortID, + Labels: copyStringMap(container.Labels), + } + + if clone.ShortID == "" { + clone.ShortID = shortID(clone.ID) + } + + if container.Runtime == nil { + return nil + } + + clone.SetRuntime(container.Runtime) + + return clone +} + +func copyStringMap(input map[string]string) map[string]string { + if len(input) == 0 { + return nil + } + + result := make(map[string]string, len(input)) + for k, v := range input { + result[k] = v + } + + return result +} + +func containerFromDockerMessage( + runtime clabruntime.ContainerRuntime, + msg dockerEvents.Message, +) *clabruntime.GenericContainer { + name := msg.Actor.Attributes["name"] + if msg.Actor.ID == "" && name == "" { + return nil + } + + container := &clabruntime.GenericContainer{ + Names: []string{name}, + ID: msg.Actor.ID, + ShortID: shortID(msg.Actor.ID), + Labels: map[string]string{}, + } + + if lab, ok := msg.Actor.Attributes[clabconstants.Containerlab]; ok && lab != "" { + container.Labels[clabconstants.Containerlab] = lab + } + + if runtime != nil { + container.SetRuntime(runtime) + } + + if container.ShortID == "" { + container.ShortID = shortID(container.ID) + } + + return container +} + +func firstContainerName(container *clabruntime.GenericContainer) string { + if container == nil || len(container.Names) == 0 { + return "" + } + + return container.Names[0] +} + +func containerLabel(container *clabruntime.GenericContainer) string { + if container == nil || container.Labels == nil { + return "" + } + + return container.Labels[clabconstants.Containerlab] +} + +func shortID(id string) string { + if len(id) > 12 { + return id[:12] + } + + return id +} diff --git a/cmd/options.go b/cmd/options.go index d77814afd..3b2153f87 100644 --- a/cmd/options.go +++ b/cmd/options.go @@ -52,6 +52,9 @@ func GetOptions() *Options { MermaidDirection: "TD", DrawIOVersion: "latest", }, + Events: &EventsOptions{ + Format: "plain", + }, ToolsAPI: &ToolsApiOptions{ Image: "ghcr.io/srl-labs/clab-api-server/clab-api-server:latest", Name: "clab-api-server", @@ -121,6 +124,7 @@ type Options struct { Exec *ExecOptions Inspect *InspectOptions Graph *GraphOptions + Events *EventsOptions ToolsAPI *ToolsApiOptions ToolsCert *ToolsCertOptions ToolsTxOffload *ToolsDisableTxOffloadOptions @@ -353,6 +357,10 @@ type GraphOptions struct { StaticDirectory string } +type EventsOptions struct { + Format string +} + type ToolsApiOptions struct { Image string Name string diff --git a/cmd/root.go b/cmd/root.go index 79bd46642..7e0bd70a5 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -29,6 +29,7 @@ func subcommandRegisterFuncs() []func(*Options) (*cobra.Command, error) { execCmd, generateCmd, graphCmd, + eventsCmd, inspectCmd, redeployCmd, saveCmd, diff --git a/docs/cmd/events.md b/docs/cmd/events.md new file mode 100644 index 000000000..048514d49 --- /dev/null +++ b/docs/cmd/events.md @@ -0,0 +1,50 @@ +# events command + +## Description + +The `events` command streams lifecycle updates for every Containerlab resource and augments them with interface change notifications collected from the container network namespaces. The output combines Docker's event feed with the netlink information that powers `containerlab inspect interfaces`, so you can observe container activity and interface state changes in real time without selecting a specific lab. + + +## Usage + +`containerlab [global-flags] events` + +The command respects the global flags such as `--runtime`, `--debug`, or `--log-level`. It adds a local `--format` option that controls the output representation. When invoked with no arguments it discovers all running labs and immediately begins streaming events; new labs that start after the command begins are picked up automatically. + +## Event format + +In the default `plain` format every line mirrors the `docker events` format: + +``` + (=, ...) +``` + +* **Docker events** show the short container ID as the actor and include the original Docker attributes (for example `image`, `name`, `containerlab`, `scope`, …). +* **Interface events** use `type` `interface` and `origin=netlink` in the attribute list. They also report interface-specific data such as `ifname`, `state`, `mtu`, `mac`, `type`, `alias`, and the lab label. The actor is still the container short ID, and the container name is supplied in the attributes (`name=...`). +* Interface notifications are emitted when a link appears, disappears, or when its relevant properties (operational state, MTU, alias, MAC address, type) change. + +When `--format json` is used, each event becomes a single JSON object on its own line. The fields match the plain output (`timestamp`, `type`, `action`, `actor_id`, `actor_name`, `actor_full_id`) and include an `attributes` map with the same key/value pairs that the plain formatter prints. + +## Examples + +### Watch an existing lab and new deployments + +``` +$ sudo containerlab events +2024-07-01T11:02:56.123456000Z container start 5d0b5a9ad3f1 (containerlab=frr-lab, image=ghcr.io/srl-labs/frr, name=clab-frr-lab-frr01) +2024-07-01T11:02:57.004321000Z interface create 5d0b5a9ad3f1 (ifname=eth0, index=22, lab=frr-lab, mac=02:42:ac:14:00:02, mtu=1500, name=clab-frr-lab-frr01, origin=netlink, state=up, type=veth) +2024-07-01T11:02:57.104512000Z interface update 5d0b5a9ad3f1 (ifname=eth0, index=22, lab=frr-lab, mac=02:42:ac:14:00:02, mtu=9000, name=clab-frr-lab-frr01, origin=netlink, state=up, type=veth) +2024-07-01T11:05:12.918273000Z container die 5d0b5a9ad3f1 (containerlab=frr-lab, exitCode=0, image=ghcr.io/srl-labs/frr, name=clab-frr-lab-frr01) +2024-07-01T11:05:13.018456000Z interface delete 5d0b5a9ad3f1 (ifname=eth0, index=22, lab=frr-lab, name=clab-frr-lab-frr01, origin=netlink, state=up, type=veth) +``` + +The stream contains all currently running labs and stays active to capture subsequent deployments, restarts, or interface adjustments. + +### Use with alternative runtimes + +The command currently supports the Docker runtime. When `--runtime` selects another backend (for example Podman) the command exits with an explanatory error so that you can adjust the configuration or fall back to Docker. + +## See also + +* [`inspect interfaces`](inspect/interfaces.md) – produces a point-in-time view of the same interface details that `events` reports continuously. +* `docker events` – the raw runtime feed that Containerlab builds upon. diff --git a/mkdocs.yml b/mkdocs.yml index 68237cb7a..448b61ef6 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -99,6 +99,7 @@ nav: - inspect: - cmd/inspect/index.md - interfaces: cmd/inspect/interfaces.md + - events: cmd/events.md - save: cmd/save.md - exec: cmd/exec.md - generate: cmd/generate.md From 9dd7cabde04680a62cdb0b8f390bbf85e7e14108 Mon Sep 17 00:00:00 2001 From: Flosch62 Date: Sat, 18 Oct 2025 16:54:01 +0200 Subject: [PATCH 02/10] added robotframework test --- tests/01-smoke/26-events.robot | 119 +++++++++++++++++++++++++++++++++ 1 file changed, 119 insertions(+) create mode 100644 tests/01-smoke/26-events.robot diff --git a/tests/01-smoke/26-events.robot b/tests/01-smoke/26-events.robot new file mode 100644 index 000000000..dd3207825 --- /dev/null +++ b/tests/01-smoke/26-events.robot @@ -0,0 +1,119 @@ +*** Comments *** +This suite verifies the `containerlab events` command for both plain and JSON output formats. + + +*** Settings *** +Library OperatingSystem +Library Process +Library String +Resource ../common.robot + + +*** Variables *** +${runtime} docker +${lab-name} 2-linux-nodes +${topo} ${CURDIR}/01-linux-nodes.clab.yml + + +*** Test Cases *** +Events Command Streams Plain Output + [Documentation] Verify that the plain formatter emits container lifecycle and interface updates enriched with netlink attributes. + ${plain-log} Set Variable /tmp/clab-events-plain.log + ${plain-err} Set Variable /tmp/clab-events-plain.err + Remove File If Exists ${plain-log} + Remove File If Exists ${plain-err} + TRY + Start Events Process events_plain plain ${plain-log} ${plain-err} + Deploy Lab For Events + Sleep 5s + Destroy Lab For Events + Sleep 3s + Stop Events Process events_plain + ${plain-output} = Get File ${plain-log} + Log ${plain-output} + Should Contain ${plain-output} container create + Should Contain ${plain-output} container start + Should Contain ${plain-output} container die + Should Contain ${plain-output} interface create + Should Contain ${plain-output} origin=netlink + FINALLY + Cleanup Events Scenario events_plain + Remove File If Exists ${plain-log} + Remove File If Exists ${plain-err} + END + +Events Command Streams JSON Output + [Documentation] Verify that JSON formatted events remain valid JSON lines and retain interface metadata. + ${json-log} Set Variable /tmp/clab-events-json.log + ${json-err} Set Variable /tmp/clab-events-json.err + Remove File If Exists ${json-log} + Remove File If Exists ${json-err} + TRY + Start Events Process events_json json ${json-log} ${json-err} + Deploy Lab For Events + Sleep 5s + Destroy Lab For Events + Sleep 3s + Stop Events Process events_json + ${json-output} = Get File ${json-log} + Log ${json-output} + Should Not Be Empty ${json-output} + Should Contain ${json-output} "type":"container" + Should Contain ${json-output} "type":"interface" + Should Contain ${json-output} "origin":"netlink" + Validate JSON Lines ${json-log} + ${actor} Set Variable clab-${lab-name}-l1 + ${rc} ${output} = Run And Return Rc And Output + ... bash -lc "jq -r 'select(.actor_name==\"${actor}\") | .actor_id' ${json-log} | head -n 1" + Log ${output} + Should Be Equal As Integers ${rc} 0 + Should Not Be Empty ${output} + FINALLY + Cleanup Events Scenario events_json + Remove File If Exists ${json-log} + Remove File If Exists ${json-err} + END + + +*** Keywords *** +Remove File If Exists + [Arguments] ${path} + Run Keyword And Ignore Error Remove File ${path} + +Start Events Process + [Arguments] ${alias} ${format} ${stdout} ${stderr} + ${cmd} Set Variable ${CLAB_BIN} --runtime ${runtime} events --format ${format} + Start Process ${cmd} shell=True alias=${alias} stdout=${stdout} stderr=${stderr} + Sleep 1s + +Stop Events Process + [Arguments] ${alias} + Run Keyword And Ignore Error Terminate Process ${alias} kill=True + Run Keyword And Ignore Error Wait For Process ${alias} + +Deploy Lab For Events + ${rc} ${output} = Run And Return Rc And Output + ... ${CLAB_BIN} --runtime ${runtime} deploy -t ${topo} + Log ${output} + Should Be Equal As Integers ${rc} 0 + +Destroy Lab For Events + ${rc} ${output} = Run And Return Rc And Output + ... ${CLAB_BIN} --runtime ${runtime} destroy -t ${topo} --cleanup + Log ${output} + Should Be Equal As Integers ${rc} 0 + +Cleanup Events Scenario + [Arguments] ${alias} + Run Keyword And Ignore Error Terminate Process ${alias} kill=True + Run Keyword And Ignore Error Wait For Process ${alias} + Run Keyword And Ignore Error Run ${CLAB_BIN} --runtime ${runtime} destroy -t ${topo} --cleanup + +Validate JSON Lines + [Arguments] ${path} + ${result} = Process.Run Process + ... bash -lc "python -c 'import json,sys; [json.loads(line) for line in sys.stdin]' < ${path}" + ... shell=True + Log ${result.stdout} + Log ${result.stderr} + Should Be Equal As Integers ${result.rc} 0 From 328f603ea117e14a9ee5c2abc1893cc1cfafaa0e Mon Sep 17 00:00:00 2001 From: Flosch62 Date: Sat, 18 Oct 2025 17:10:33 +0200 Subject: [PATCH 03/10] Move event logic to clab package --- clab/events.go | 730 +++++++++++++++++++++++++++++++++++++++++++++++++ cmd/events.go | 727 +----------------------------------------------- 2 files changed, 739 insertions(+), 718 deletions(-) create mode 100644 clab/events.go diff --git a/clab/events.go b/clab/events.go new file mode 100644 index 000000000..821fa1d0b --- /dev/null +++ b/clab/events.go @@ -0,0 +1,730 @@ +package clab + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "sort" + "strconv" + "strings" + "sync" + "time" + + "github.com/charmbracelet/log" + dockerTypes "github.com/docker/docker/api/types" + dockerEvents "github.com/docker/docker/api/types/events" + "github.com/docker/docker/api/types/filters" + dockerClient "github.com/docker/docker/client" + clabconstants "github.com/srl-labs/containerlab/constants" + clabcore "github.com/srl-labs/containerlab/core" + clabruntime "github.com/srl-labs/containerlab/runtime" + clabruntimedocker "github.com/srl-labs/containerlab/runtime/docker" + clabutils "github.com/srl-labs/containerlab/utils" + "github.com/vishvananda/netlink" + "github.com/vishvananda/netns" + "golang.org/x/sys/unix" +) + +type EventsOptions struct { + Format string + Runtime string + ClabOptions []clabcore.ClabOption +} + +func Events(ctx context.Context, opts EventsOptions) error { //nolint:cyclop,funlen + if err := clabutils.CheckAndGetRootPrivs(); err != nil { + return err + } + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + c, err := clabcore.NewContainerLab(opts.ClabOptions...) + if err != nil { + return err + } + + runtime, ok := c.Runtimes[opts.Runtime] + if !ok { + return fmt.Errorf("runtime %q is not initialized", opts.Runtime) + } + + dockerRuntime, ok := runtime.(*clabruntimedocker.DockerRuntime) + if !ok { + return fmt.Errorf("events command currently supports only the %s runtime", clabruntimedocker.RuntimeName) + } + + format := strings.TrimSpace(strings.ToLower(opts.Format)) + if format == "" { + format = "plain" + } + + var printer func(aggregatedEvent) + switch format { + case "plain": + printer = printAggregatedEvent + case "json": + printer = printAggregatedEventJSON + default: + return fmt.Errorf("output format %q is not supported, use 'plain' or 'json'", opts.Format) + } + + eventCh := make(chan aggregatedEvent, 128) + errCh := make(chan error, 1) + registry := newNetlinkRegistry(eventCh) + + containers, err := c.ListContainers(ctx, clabcore.WithListclabLabelExists()) + if err != nil { + return fmt.Errorf("failed to list containers: %w", err) + } + + for idx := range containers { + container := containers[idx] + if !isRunningContainer(&container) { + continue + } + + registry.Start(ctx, &container) + } + + go streamDockerEvents(ctx, dockerRuntime.Client, dockerRuntime, registry, eventCh, errCh) + + for { + select { + case ev := <-eventCh: + printer(ev) + case err := <-errCh: + if err == nil || errors.Is(err, context.Canceled) { + return nil + } + + return err + case <-ctx.Done(): + return nil + } + } +} + +type aggregatedEvent struct { + Timestamp time.Time `json:"timestamp"` + Type string `json:"type"` + Action string `json:"action"` + ActorID string `json:"actor_id"` + ActorName string `json:"actor_name"` + ActorFullID string `json:"actor_full_id"` + Attributes map[string]string `json:"attributes,omitempty"` +} + +func dockerMessageToEvent(msg dockerEvents.Message) aggregatedEvent { + ts := time.Unix(0, msg.TimeNano) + if ts.IsZero() { + ts = time.Unix(msg.Time, 0) + } + if ts.IsZero() { + ts = time.Now() + } + + attributes := make(map[string]string, len(msg.Actor.Attributes)+1) + for k, v := range msg.Actor.Attributes { + if v == "" { + continue + } + + attributes[k] = v + } + + if msg.Scope != "" { + attributes["scope"] = msg.Scope + } + + return aggregatedEvent{ + Timestamp: ts, + Type: string(msg.Type), + Action: string(msg.Action), + ActorID: shortID(msg.Actor.ID), + ActorName: attributes["name"], + ActorFullID: msg.Actor.ID, + Attributes: attributes, + } +} + +func printAggregatedEvent(ev aggregatedEvent) { + ts := ev.Timestamp + if ts.IsZero() { + ts = time.Now() + } + ts = ts.UTC() + + actor := ev.ActorID + if actor == "" { + actor = ev.ActorName + } + if actor == "" { + actor = "-" + } + + attrs := mergedEventAttributes(ev) + keys := make([]string, 0, len(attrs)) + for k := range attrs { + keys = append(keys, k) + } + sort.Strings(keys) + + attrParts := make([]string, 0, len(keys)) + for _, k := range keys { + attrParts = append(attrParts, fmt.Sprintf("%s=%s", k, attrs[k])) + } + + suffix := "" + if len(attrParts) > 0 { + suffix = " (" + strings.Join(attrParts, ", ") + ")" + } + + fmt.Printf("%s %s %s %s%s\n", ts.Format(time.RFC3339Nano), ev.Type, ev.Action, actor, suffix) +} + +func printAggregatedEventJSON(ev aggregatedEvent) { + evCopy := ev + evCopy.Attributes = mergedEventAttributes(ev) + + b, err := json.Marshal(evCopy) + if err != nil { + log.Debugf("failed to marshal event to json: %v", err) + + return + } + + fmt.Println(string(b)) +} + +func mergedEventAttributes(ev aggregatedEvent) map[string]string { + if len(ev.Attributes) == 0 && ev.ActorName == "" && ev.ActorFullID == "" { + return nil + } + + attrs := make(map[string]string, len(ev.Attributes)+2) + for k, v := range ev.Attributes { + if v == "" { + continue + } + + attrs[k] = v + } + + if ev.ActorName != "" { + attrs["name"] = ev.ActorName + } + + if ev.ActorFullID != "" { + attrs["id"] = ev.ActorFullID + } + + if len(attrs) == 0 { + return nil + } + + return attrs +} + +func streamDockerEvents( + ctx context.Context, + client *dockerClient.Client, + runtime clabruntime.ContainerRuntime, + registry *netlinkRegistry, + eventSink chan<- aggregatedEvent, + errSink chan<- error, +) { + filtersArgs := filters.NewArgs() + filtersArgs.Add("label", clabconstants.Containerlab) + + messages, errs := client.Events(ctx, dockerTypes.EventsOptions{Filters: filtersArgs}) + + for { + select { + case <-ctx.Done(): + errSink <- nil + + return + case err, ok := <-errs: + if !ok { + errSink <- nil + + return + } + + if err != nil && !errors.Is(err, context.Canceled) { + errSink <- err + + return + } + case msg, ok := <-messages: + if !ok { + errSink <- nil + + return + } + + registry.HandleDockerMessage(ctx, runtime, msg) + eventSink <- dockerMessageToEvent(msg) + } + } +} + +type netlinkRegistry struct { + mu sync.Mutex + watchers map[string]*netlinkWatcher + events chan<- aggregatedEvent +} + +func newNetlinkRegistry(events chan<- aggregatedEvent) *netlinkRegistry { + return &netlinkRegistry{ + watchers: make(map[string]*netlinkWatcher), + events: events, + } +} + +func (r *netlinkRegistry) Start(ctx context.Context, container *clabruntime.GenericContainer) { + clone := cloneContainer(container) + if clone == nil { + return + } + + id := clone.ID + if id == "" { + id = clone.ShortID + } + + if id == "" { + return + } + + r.mu.Lock() + if _, exists := r.watchers[id]; exists { + r.mu.Unlock() + + return + } + + watcherCtx, cancel := context.WithCancel(ctx) + watcher := &netlinkWatcher{ + container: clone, + cancel: cancel, + done: make(chan struct{}), + } + + r.watchers[id] = watcher + r.mu.Unlock() + + go watcher.run(watcherCtx, r) +} + +func (r *netlinkRegistry) Stop(id string) { + if id == "" { + return + } + + r.mu.Lock() + watcher, ok := r.watchers[id] + if ok { + delete(r.watchers, id) + } + r.mu.Unlock() + + if !ok { + return + } + + watcher.cancel() + <-watcher.done +} + +func (r *netlinkRegistry) remove(id string, watcher *netlinkWatcher) { + if id == "" { + return + } + + r.mu.Lock() + defer r.mu.Unlock() + + if current, ok := r.watchers[id]; ok && current == watcher { + delete(r.watchers, id) + } +} + +func (r *netlinkRegistry) HandleDockerMessage( + ctx context.Context, + runtime clabruntime.ContainerRuntime, + msg dockerEvents.Message, +) { + if msg.Type != dockerEvents.ContainerEventType { + return + } + + switch msg.Action { + case dockerEvents.ActionStart, dockerEvents.ActionUnPause, dockerEvents.ActionRestart: + container := containerFromDockerMessage(runtime, msg) + if container != nil { + r.Start(ctx, container) + } + case dockerEvents.ActionDie, dockerEvents.ActionStop, dockerEvents.ActionDestroy, dockerEvents.ActionKill: + id := msg.Actor.ID + if id == "" { + id = msg.ID + } + + r.Stop(id) + } +} + +type netlinkWatcher struct { + container *clabruntime.GenericContainer + cancel context.CancelFunc + done chan struct{} +} + +func (w *netlinkWatcher) run(ctx context.Context, registry *netlinkRegistry) { + defer close(w.done) + if w.container == nil { + return + } + + defer registry.remove(w.container.ID, w) + + containerName := firstContainerName(w.container) + if w.container.Runtime == nil { + log.Debugf("container %s has no runtime, skipping netlink watcher", containerName) + + return + } + + nsPath, err := waitForNamespacePath(ctx, w.container.Runtime, w.container.ID) + if err != nil || nsPath == "" { + log.Debugf("failed to resolve netns for container %s: %v", containerName, err) + + return + } + + nsHandle, err := netns.GetFromPath(nsPath) + if err != nil { + log.Debugf("failed to open netns for container %s: %v", containerName, err) + + return + } + defer nsHandle.Close() + + states, err := snapshotInterfaces(nsHandle) + if err != nil { + log.Debugf("failed to snapshot interfaces for container %s: %v", containerName, err) + states = make(map[int]ifaceSnapshot) + } + + updates := make(chan netlink.LinkUpdate, 32) + done := make(chan struct{}) + opts := netlink.LinkSubscribeOptions{Namespace: &nsHandle} + + if err := netlink.LinkSubscribeWithOptions(updates, done, opts); err != nil { + log.Debugf("failed to subscribe to netlink updates for container %s: %v", containerName, err) + + return + } + + for { + select { + case <-ctx.Done(): + close(done) + + return + case update, ok := <-updates: + if !ok { + return + } + + w.processUpdate(states, update, registry) + } + } +} + +func (w *netlinkWatcher) processUpdate( + states map[int]ifaceSnapshot, + update netlink.LinkUpdate, + registry *netlinkRegistry, +) { + if update.Link == nil { + return + } + + attrs := update.Link.Attrs() + if attrs == nil { + return + } + + snapshot := snapshotFromLink(update.Link) + previous, exists := states[snapshot.Index] + + switch update.Header.Type { + case unix.RTM_DELLINK: + if exists { + snapshot = previous + } + + delete(states, snapshot.Index) + registry.emitInterfaceEvent(w.container, "delete", snapshot) + case unix.RTM_NEWLINK: + if exists && snapshot.equal(previous) { + return + } + + action := "create" + if exists { + action = "update" + } + + states[snapshot.Index] = snapshot + registry.emitInterfaceEvent(w.container, action, snapshot) + } +} + +func (r *netlinkRegistry) emitInterfaceEvent( + container *clabruntime.GenericContainer, + action string, + snapshot ifaceSnapshot, +) { + if container == nil { + return + } + + attributes := map[string]string{ + "ifname": snapshot.Name, + "index": strconv.Itoa(snapshot.Index), + "mtu": strconv.Itoa(snapshot.MTU), + "state": snapshot.OperState, + "type": snapshot.Type, + "origin": "netlink", + } + + if snapshot.Alias != "" { + attributes["alias"] = snapshot.Alias + } + + if snapshot.MAC != "" { + attributes["mac"] = snapshot.MAC + } + + if lab := containerLabel(container); lab != "" { + attributes["lab"] = lab + } + + if name := firstContainerName(container); name != "" { + attributes["name"] = name + } + + r.events <- aggregatedEvent{ + Timestamp: time.Now(), + Type: "interface", + Action: action, + ActorID: container.ShortID, + ActorName: firstContainerName(container), + ActorFullID: container.ID, + Attributes: attributes, + } +} + +func waitForNamespacePath( + ctx context.Context, + runtime clabruntime.ContainerRuntime, + containerID string, +) (string, error) { + const ( + attempts = 5 + retryDelay = 200 * time.Millisecond + ) + + var lastErr error + + for i := 0; i < attempts; i++ { + nsPath, err := runtime.GetNSPath(ctx, containerID) + if err == nil && nsPath != "" { + return nsPath, nil + } + + if err != nil { + lastErr = err + } + + select { + case <-time.After(retryDelay): + case <-ctx.Done(): + return "", ctx.Err() + } + } + + if lastErr != nil { + return "", lastErr + } + + return "", fmt.Errorf("namespace path not ready for container %s", containerID) +} + +func snapshotInterfaces(nsHandle netns.NsHandle) (map[int]ifaceSnapshot, error) { + netHandle, err := netlink.NewHandleAt(nsHandle) + if err != nil { + return nil, fmt.Errorf("unable to enter namespace: %w", err) + } + defer netHandle.Close() + + links, err := netHandle.LinkList() + if err != nil { + return nil, fmt.Errorf("unable to list links: %w", err) + } + + states := make(map[int]ifaceSnapshot, len(links)) + for _, link := range links { + snapshot := snapshotFromLink(link) + states[snapshot.Index] = snapshot + } + + return states, nil +} + +func snapshotFromLink(link netlink.Link) ifaceSnapshot { + attrs := link.Attrs() + + snapshot := ifaceSnapshot{ + Type: link.Type(), + } + + if attrs != nil { + snapshot.Index = attrs.Index + snapshot.Name = attrs.Name + snapshot.Alias = attrs.Alias + snapshot.MTU = attrs.MTU + if len(attrs.HardwareAddr) > 0 { + snapshot.MAC = attrs.HardwareAddr.String() + } + snapshot.OperState = attrs.OperState.String() + } + + return snapshot +} + +type ifaceSnapshot struct { + Index int + Name string + Alias string + MTU int + MAC string + OperState string + Type string +} + +func (s ifaceSnapshot) equal(other ifaceSnapshot) bool { + return s.Index == other.Index && + s.Name == other.Name && + s.Alias == other.Alias && + s.MTU == other.MTU && + s.MAC == other.MAC && + s.OperState == other.OperState && + s.Type == other.Type +} + +func isRunningContainer(container *clabruntime.GenericContainer) bool { + if container == nil { + return false + } + + return strings.EqualFold(container.State, "running") +} + +func cloneContainer(container *clabruntime.GenericContainer) *clabruntime.GenericContainer { + if container == nil { + return nil + } + + clone := &clabruntime.GenericContainer{ + Names: append([]string{}, container.Names...), + ID: container.ID, + ShortID: container.ShortID, + Labels: copyStringMap(container.Labels), + } + + if clone.ShortID == "" { + clone.ShortID = shortID(clone.ID) + } + + if container.Runtime == nil { + return nil + } + + clone.SetRuntime(container.Runtime) + + return clone +} + +func copyStringMap(input map[string]string) map[string]string { + if len(input) == 0 { + return nil + } + + result := make(map[string]string, len(input)) + for k, v := range input { + result[k] = v + } + + return result +} + +func containerFromDockerMessage( + runtime clabruntime.ContainerRuntime, + msg dockerEvents.Message, +) *clabruntime.GenericContainer { + name := msg.Actor.Attributes["name"] + if msg.Actor.ID == "" && name == "" { + return nil + } + + container := &clabruntime.GenericContainer{ + Names: []string{name}, + ID: msg.Actor.ID, + ShortID: shortID(msg.Actor.ID), + Labels: map[string]string{}, + } + + if lab, ok := msg.Actor.Attributes[clabconstants.Containerlab]; ok && lab != "" { + container.Labels[clabconstants.Containerlab] = lab + } + + if runtime != nil { + container.SetRuntime(runtime) + } + + if container.ShortID == "" { + container.ShortID = shortID(container.ID) + } + + return container +} + +func firstContainerName(container *clabruntime.GenericContainer) string { + if container == nil || len(container.Names) == 0 { + return "" + } + + return container.Names[0] +} + +func containerLabel(container *clabruntime.GenericContainer) string { + if container == nil || container.Labels == nil { + return "" + } + + return container.Labels[clabconstants.Containerlab] +} + +func shortID(id string) string { + if len(id) > 12 { + return id[:12] + } + + return id +} diff --git a/cmd/events.go b/cmd/events.go index 9fd2946fb..5ee6a30bc 100644 --- a/cmd/events.go +++ b/cmd/events.go @@ -1,30 +1,8 @@ package cmd import ( - "context" - "encoding/json" - "errors" - "fmt" - "sort" - "strconv" - "strings" - "sync" - "time" - - "github.com/charmbracelet/log" - dockerTypes "github.com/docker/docker/api/types" - dockerEvents "github.com/docker/docker/api/types/events" - "github.com/docker/docker/api/types/filters" - dockerClient "github.com/docker/docker/client" "github.com/spf13/cobra" - clabconstants "github.com/srl-labs/containerlab/constants" - clabcore "github.com/srl-labs/containerlab/core" - clabruntime "github.com/srl-labs/containerlab/runtime" - clabruntimedocker "github.com/srl-labs/containerlab/runtime/docker" - clabutils "github.com/srl-labs/containerlab/utils" - "github.com/vishvananda/netlink" - "github.com/vishvananda/netns" - "golang.org/x/sys/unix" + clabevents "github.com/srl-labs/containerlab/clab" ) func eventsCmd(o *Options) (*cobra.Command, error) { @@ -33,8 +11,8 @@ func eventsCmd(o *Options) (*cobra.Command, error) { Short: "stream lab lifecycle and interface events", Long: "stream docker runtime events as well as container interface updates for all running labs\n" + "reference: https://containerlab.dev/cmd/events/", - RunE: func(cobraCmd *cobra.Command, _ []string) error { - return eventsFn(cobraCmd, o) + RunE: func(cmd *cobra.Command, _ []string) error { + return eventsFn(cmd, o) }, } @@ -49,699 +27,12 @@ func eventsCmd(o *Options) (*cobra.Command, error) { return c, nil } -func eventsFn(cobraCmd *cobra.Command, o *Options) error { - if err := clabutils.CheckAndGetRootPrivs(); err != nil { - return err - } - - ctx, cancel := context.WithCancel(cobraCmd.Context()) - defer cancel() - - c, err := clabcore.NewContainerLab(o.ToClabOptions()...) - if err != nil { - return err - } - - runtime, ok := c.Runtimes[o.Global.Runtime] - if !ok { - return fmt.Errorf("runtime %q is not initialized", o.Global.Runtime) - } - - dockerRuntime, ok := runtime.(*clabruntimedocker.DockerRuntime) - if !ok { - return fmt.Errorf("events command currently supports only the %s runtime", clabruntimedocker.RuntimeName) - } - - format := strings.TrimSpace(strings.ToLower(o.Events.Format)) - if format == "" { - format = "plain" - } - - var printer func(aggregatedEvent) - switch format { - case "plain": - printer = printAggregatedEvent - case "json": - printer = printAggregatedEventJSON - default: - return fmt.Errorf("output format %q is not supported, use 'plain' or 'json'", o.Events.Format) - } - - eventCh := make(chan aggregatedEvent, 128) - errCh := make(chan error, 1) - registry := newNetlinkRegistry(eventCh) - - containers, err := c.ListContainers(ctx, clabcore.WithListclabLabelExists()) - if err != nil { - return fmt.Errorf("failed to list containers: %w", err) - } - - for idx := range containers { - container := containers[idx] - if !isRunningContainer(&container) { - continue - } - - registry.Start(ctx, &container) - } - - go streamDockerEvents(ctx, dockerRuntime.Client, dockerRuntime, registry, eventCh, errCh) - - for { - select { - case ev := <-eventCh: - printer(ev) - case err := <-errCh: - if err == nil || errors.Is(err, context.Canceled) { - return nil - } - - return err - case <-ctx.Done(): - return nil - } - } -} - -type aggregatedEvent struct { - Timestamp time.Time `json:"timestamp"` - Type string `json:"type"` - Action string `json:"action"` - ActorID string `json:"actor_id"` - ActorName string `json:"actor_name"` - ActorFullID string `json:"actor_full_id"` - Attributes map[string]string `json:"attributes,omitempty"` -} - -func dockerMessageToEvent(msg dockerEvents.Message) aggregatedEvent { - ts := time.Unix(0, msg.TimeNano) - if ts.IsZero() { - ts = time.Unix(msg.Time, 0) - } - if ts.IsZero() { - ts = time.Now() - } - - attributes := make(map[string]string, len(msg.Actor.Attributes)+1) - for k, v := range msg.Actor.Attributes { - if v == "" { - continue - } - - attributes[k] = v - } - - if msg.Scope != "" { - attributes["scope"] = msg.Scope - } - - return aggregatedEvent{ - Timestamp: ts, - Type: string(msg.Type), - Action: string(msg.Action), - ActorID: shortID(msg.Actor.ID), - ActorName: attributes["name"], - ActorFullID: msg.Actor.ID, - Attributes: attributes, - } -} - -func printAggregatedEvent(ev aggregatedEvent) { - ts := ev.Timestamp - if ts.IsZero() { - ts = time.Now() - } - ts = ts.UTC() - - actor := ev.ActorID - if actor == "" { - actor = ev.ActorName - } - if actor == "" { - actor = "-" - } - - attrs := mergedEventAttributes(ev) - keys := make([]string, 0, len(attrs)) - for k := range attrs { - keys = append(keys, k) - } - sort.Strings(keys) - - attrParts := make([]string, 0, len(keys)) - for _, k := range keys { - attrParts = append(attrParts, fmt.Sprintf("%s=%s", k, attrs[k])) - } - - suffix := "" - if len(attrParts) > 0 { - suffix = " (" + strings.Join(attrParts, ", ") + ")" - } - - fmt.Printf("%s %s %s %s%s\n", ts.Format(time.RFC3339Nano), ev.Type, ev.Action, actor, suffix) -} - -func printAggregatedEventJSON(ev aggregatedEvent) { - evCopy := ev - evCopy.Attributes = mergedEventAttributes(ev) - - b, err := json.Marshal(evCopy) - if err != nil { - log.Debugf("failed to marshal event to json: %v", err) - - return - } - - fmt.Println(string(b)) -} - -func mergedEventAttributes(ev aggregatedEvent) map[string]string { - if len(ev.Attributes) == 0 && ev.ActorName == "" && ev.ActorFullID == "" { - return nil - } - - attrs := make(map[string]string, len(ev.Attributes)+2) - for k, v := range ev.Attributes { - if v == "" { - continue - } - - attrs[k] = v - } - - if ev.ActorName != "" { - attrs["name"] = ev.ActorName - } - - if ev.ActorFullID != "" { - attrs["id"] = ev.ActorFullID - } - - if len(attrs) == 0 { - return nil - } - - return attrs -} - -func streamDockerEvents( - ctx context.Context, - client *dockerClient.Client, - runtime clabruntime.ContainerRuntime, - registry *netlinkRegistry, - eventSink chan<- aggregatedEvent, - errSink chan<- error, -) { - filtersArgs := filters.NewArgs() - filtersArgs.Add("label", clabconstants.Containerlab) - - messages, errs := client.Events(ctx, dockerTypes.EventsOptions{Filters: filtersArgs}) - - for { - select { - case <-ctx.Done(): - errSink <- nil - - return - case err, ok := <-errs: - if !ok { - errSink <- nil - - return - } - - if err != nil && !errors.Is(err, context.Canceled) { - errSink <- err - - return - } - case msg, ok := <-messages: - if !ok { - errSink <- nil - - return - } - - registry.HandleDockerMessage(ctx, runtime, msg) - eventSink <- dockerMessageToEvent(msg) - } - } -} - -type netlinkRegistry struct { - mu sync.Mutex - watchers map[string]*netlinkWatcher - events chan<- aggregatedEvent -} - -func newNetlinkRegistry(events chan<- aggregatedEvent) *netlinkRegistry { - return &netlinkRegistry{ - watchers: make(map[string]*netlinkWatcher), - events: events, - } -} - -func (r *netlinkRegistry) Start(ctx context.Context, container *clabruntime.GenericContainer) { - clone := cloneContainer(container) - if clone == nil { - return - } - - id := clone.ID - if id == "" { - id = clone.ShortID - } - - if id == "" { - return - } - - r.mu.Lock() - if _, exists := r.watchers[id]; exists { - r.mu.Unlock() - - return - } - - watcherCtx, cancel := context.WithCancel(ctx) - watcher := &netlinkWatcher{ - container: clone, - cancel: cancel, - done: make(chan struct{}), - } - - r.watchers[id] = watcher - r.mu.Unlock() - - go watcher.run(watcherCtx, r) -} - -func (r *netlinkRegistry) Stop(id string) { - if id == "" { - return - } - - r.mu.Lock() - watcher, ok := r.watchers[id] - if ok { - delete(r.watchers, id) - } - r.mu.Unlock() - - if !ok { - return - } - - watcher.cancel() - <-watcher.done -} - -func (r *netlinkRegistry) remove(id string, watcher *netlinkWatcher) { - if id == "" { - return - } - - r.mu.Lock() - defer r.mu.Unlock() - - if current, ok := r.watchers[id]; ok && current == watcher { - delete(r.watchers, id) - } -} - -func (r *netlinkRegistry) HandleDockerMessage( - ctx context.Context, - runtime clabruntime.ContainerRuntime, - msg dockerEvents.Message, -) { - if msg.Type != dockerEvents.ContainerEventType { - return - } - - switch msg.Action { - case dockerEvents.ActionStart, dockerEvents.ActionUnPause, dockerEvents.ActionRestart: - container := containerFromDockerMessage(runtime, msg) - if container != nil { - r.Start(ctx, container) - } - case dockerEvents.ActionDie, dockerEvents.ActionStop, dockerEvents.ActionDestroy, dockerEvents.ActionKill: - id := msg.Actor.ID - if id == "" { - id = msg.ID - } - - r.Stop(id) - } -} - -type netlinkWatcher struct { - container *clabruntime.GenericContainer - cancel context.CancelFunc - done chan struct{} -} - -func (w *netlinkWatcher) run(ctx context.Context, registry *netlinkRegistry) { - defer close(w.done) - if w.container == nil { - return - } - - defer registry.remove(w.container.ID, w) - - containerName := firstContainerName(w.container) - if w.container.Runtime == nil { - log.Debugf("container %s has no runtime, skipping netlink watcher", containerName) - - return - } - - nsPath, err := waitForNamespacePath(ctx, w.container.Runtime, w.container.ID) - if err != nil || nsPath == "" { - log.Debugf("failed to resolve netns for container %s: %v", containerName, err) - - return - } - - nsHandle, err := netns.GetFromPath(nsPath) - if err != nil { - log.Debugf("failed to open netns for container %s: %v", containerName, err) - - return - } - defer nsHandle.Close() - - states, err := snapshotInterfaces(nsHandle) - if err != nil { - log.Debugf("failed to snapshot interfaces for container %s: %v", containerName, err) - states = make(map[int]ifaceSnapshot) - } - - updates := make(chan netlink.LinkUpdate, 32) - done := make(chan struct{}) - opts := netlink.LinkSubscribeOptions{Namespace: &nsHandle} - - if err := netlink.LinkSubscribeWithOptions(updates, done, opts); err != nil { - log.Debugf("failed to subscribe to netlink updates for container %s: %v", containerName, err) - - return - } - - for { - select { - case <-ctx.Done(): - close(done) - - return - case update, ok := <-updates: - if !ok { - return - } - - w.processUpdate(states, update, registry) - } - } -} - -func (w *netlinkWatcher) processUpdate( - states map[int]ifaceSnapshot, - update netlink.LinkUpdate, - registry *netlinkRegistry, -) { - if update.Link == nil { - return - } - - attrs := update.Link.Attrs() - if attrs == nil { - return - } - - snapshot := snapshotFromLink(update.Link) - previous, exists := states[snapshot.Index] - - switch update.Header.Type { - case unix.RTM_DELLINK: - if exists { - snapshot = previous - } - - delete(states, snapshot.Index) - registry.emitInterfaceEvent(w.container, "delete", snapshot) - case unix.RTM_NEWLINK: - if exists && snapshot.equal(previous) { - return - } - - action := "create" - if exists { - action = "update" - } - - states[snapshot.Index] = snapshot - registry.emitInterfaceEvent(w.container, action, snapshot) - } -} - -func (r *netlinkRegistry) emitInterfaceEvent( - container *clabruntime.GenericContainer, - action string, - snapshot ifaceSnapshot, -) { - if container == nil { - return - } - - attributes := map[string]string{ - "ifname": snapshot.Name, - "index": strconv.Itoa(snapshot.Index), - "mtu": strconv.Itoa(snapshot.MTU), - "state": snapshot.OperState, - "type": snapshot.Type, - "origin": "netlink", - } - - if snapshot.Alias != "" { - attributes["alias"] = snapshot.Alias - } - - if snapshot.MAC != "" { - attributes["mac"] = snapshot.MAC - } - - if lab := containerLabel(container); lab != "" { - attributes["lab"] = lab - } - - if name := firstContainerName(container); name != "" { - attributes["name"] = name - } - - r.events <- aggregatedEvent{ - Timestamp: time.Now(), - Type: "interface", - Action: action, - ActorID: container.ShortID, - ActorName: firstContainerName(container), - ActorFullID: container.ID, - Attributes: attributes, - } -} - -func waitForNamespacePath( - ctx context.Context, - runtime clabruntime.ContainerRuntime, - containerID string, -) (string, error) { - const ( - attempts = 5 - retryDelay = 200 * time.Millisecond - ) - - var lastErr error - - for i := 0; i < attempts; i++ { - nsPath, err := runtime.GetNSPath(ctx, containerID) - if err == nil && nsPath != "" { - return nsPath, nil - } - - if err != nil { - lastErr = err - } - - select { - case <-time.After(retryDelay): - case <-ctx.Done(): - return "", ctx.Err() - } - } - - if lastErr != nil { - return "", lastErr - } - - return "", fmt.Errorf("namespace path not ready for container %s", containerID) -} - -func snapshotInterfaces(nsHandle netns.NsHandle) (map[int]ifaceSnapshot, error) { - netHandle, err := netlink.NewHandleAt(nsHandle) - if err != nil { - return nil, fmt.Errorf("unable to enter namespace: %w", err) - } - defer netHandle.Close() - - links, err := netHandle.LinkList() - if err != nil { - return nil, fmt.Errorf("unable to list links: %w", err) - } - - states := make(map[int]ifaceSnapshot, len(links)) - for _, link := range links { - snapshot := snapshotFromLink(link) - states[snapshot.Index] = snapshot - } - - return states, nil -} - -func snapshotFromLink(link netlink.Link) ifaceSnapshot { - attrs := link.Attrs() - - snapshot := ifaceSnapshot{ - Type: link.Type(), - } - - if attrs != nil { - snapshot.Index = attrs.Index - snapshot.Name = attrs.Name - snapshot.Alias = attrs.Alias - snapshot.MTU = attrs.MTU - if len(attrs.HardwareAddr) > 0 { - snapshot.MAC = attrs.HardwareAddr.String() - } - snapshot.OperState = attrs.OperState.String() - } - - return snapshot -} - -type ifaceSnapshot struct { - Index int - Name string - Alias string - MTU int - MAC string - OperState string - Type string -} - -func (s ifaceSnapshot) equal(other ifaceSnapshot) bool { - return s.Index == other.Index && - s.Name == other.Name && - s.Alias == other.Alias && - s.MTU == other.MTU && - s.MAC == other.MAC && - s.OperState == other.OperState && - s.Type == other.Type -} - -func isRunningContainer(container *clabruntime.GenericContainer) bool { - if container == nil { - return false - } - - return strings.EqualFold(container.State, "running") -} - -func cloneContainer(container *clabruntime.GenericContainer) *clabruntime.GenericContainer { - if container == nil { - return nil - } - - clone := &clabruntime.GenericContainer{ - Names: append([]string{}, container.Names...), - ID: container.ID, - ShortID: container.ShortID, - Labels: copyStringMap(container.Labels), - } - - if clone.ShortID == "" { - clone.ShortID = shortID(clone.ID) - } - - if container.Runtime == nil { - return nil - } - - clone.SetRuntime(container.Runtime) - - return clone -} - -func copyStringMap(input map[string]string) map[string]string { - if len(input) == 0 { - return nil - } - - result := make(map[string]string, len(input)) - for k, v := range input { - result[k] = v - } - - return result -} - -func containerFromDockerMessage( - runtime clabruntime.ContainerRuntime, - msg dockerEvents.Message, -) *clabruntime.GenericContainer { - name := msg.Actor.Attributes["name"] - if msg.Actor.ID == "" && name == "" { - return nil - } - - container := &clabruntime.GenericContainer{ - Names: []string{name}, - ID: msg.Actor.ID, - ShortID: shortID(msg.Actor.ID), - Labels: map[string]string{}, - } - - if lab, ok := msg.Actor.Attributes[clabconstants.Containerlab]; ok && lab != "" { - container.Labels[clabconstants.Containerlab] = lab - } - - if runtime != nil { - container.SetRuntime(runtime) - } - - if container.ShortID == "" { - container.ShortID = shortID(container.ID) - } - - return container -} - -func firstContainerName(container *clabruntime.GenericContainer) string { - if container == nil || len(container.Names) == 0 { - return "" - } - - return container.Names[0] -} - -func containerLabel(container *clabruntime.GenericContainer) string { - if container == nil || container.Labels == nil { - return "" - } - - return container.Labels[clabconstants.Containerlab] -} - -func shortID(id string) string { - if len(id) > 12 { - return id[:12] +func eventsFn(cmd *cobra.Command, o *Options) error { + opts := clabevents.EventsOptions{ + Format: o.Events.Format, + Runtime: o.Global.Runtime, + ClabOptions: o.ToClabOptions(), } - return id + return clabevents.Events(cmd.Context(), opts) } From 8bdbd1c77eea73fcc8535bd2675fed3afbf0f9f4 Mon Sep 17 00:00:00 2001 From: Flosch62 Date: Sat, 18 Oct 2025 18:05:54 +0200 Subject: [PATCH 04/10] Generic runtime streaming function --- clab/events.go | 161 ++++++++++++++++++++++++--------------- runtime/docker/events.go | 79 +++++++++++++++++++ runtime/ignite/ignite.go | 7 ++ runtime/podman/podman.go | 7 ++ runtime/runtime.go | 30 ++++++++ utils/events.go | 52 +++++++++++++ 6 files changed, 275 insertions(+), 61 deletions(-) create mode 100644 runtime/docker/events.go create mode 100644 utils/events.go diff --git a/clab/events.go b/clab/events.go index 821fa1d0b..738ceece5 100644 --- a/clab/events.go +++ b/clab/events.go @@ -12,14 +12,9 @@ import ( "time" "github.com/charmbracelet/log" - dockerTypes "github.com/docker/docker/api/types" - dockerEvents "github.com/docker/docker/api/types/events" - "github.com/docker/docker/api/types/filters" - dockerClient "github.com/docker/docker/client" clabconstants "github.com/srl-labs/containerlab/constants" clabcore "github.com/srl-labs/containerlab/core" clabruntime "github.com/srl-labs/containerlab/runtime" - clabruntimedocker "github.com/srl-labs/containerlab/runtime/docker" clabutils "github.com/srl-labs/containerlab/utils" "github.com/vishvananda/netlink" "github.com/vishvananda/netns" @@ -50,11 +45,6 @@ func Events(ctx context.Context, opts EventsOptions) error { //nolint:cyclop,fun return fmt.Errorf("runtime %q is not initialized", opts.Runtime) } - dockerRuntime, ok := runtime.(*clabruntimedocker.DockerRuntime) - if !ok { - return fmt.Errorf("events command currently supports only the %s runtime", clabruntimedocker.RuntimeName) - } - format := strings.TrimSpace(strings.ToLower(opts.Format)) if format == "" { format = "plain" @@ -88,7 +78,18 @@ func Events(ctx context.Context, opts EventsOptions) error { //nolint:cyclop,fun registry.Start(ctx, &container) } - go streamDockerEvents(ctx, dockerRuntime.Client, dockerRuntime, registry, eventCh, errCh) + streamOpts := clabruntime.EventStreamOptions{ + Labels: map[string]string{ + clabconstants.Containerlab: "", + }, + } + + runtimeEvents, runtimeErrs, err := runtime.StreamEvents(ctx, streamOpts) + if err != nil { + return fmt.Errorf("failed to stream events for runtime %q: %w", opts.Runtime, err) + } + + go forwardRuntimeEvents(ctx, runtime, registry, runtimeEvents, runtimeErrs, eventCh, errCh) for { select { @@ -116,35 +117,46 @@ type aggregatedEvent struct { Attributes map[string]string `json:"attributes,omitempty"` } -func dockerMessageToEvent(msg dockerEvents.Message) aggregatedEvent { - ts := time.Unix(0, msg.TimeNano) - if ts.IsZero() { - ts = time.Unix(msg.Time, 0) - } +func aggregatedEventFromContainerEvent(ev clabruntime.ContainerEvent) aggregatedEvent { + ts := ev.Timestamp if ts.IsZero() { ts = time.Now() } - attributes := make(map[string]string, len(msg.Actor.Attributes)+1) - for k, v := range msg.Actor.Attributes { - if v == "" { - continue - } + attributes := copyStringMap(ev.Attributes) + + actorFullID := ev.ActorFullID + if actorFullID == "" { + actorFullID = ev.ActorID + } + + actorName := ev.ActorName + if actorName == "" && attributes != nil { + actorName = attributes["name"] + } - attributes[k] = v + short := ev.ActorID + if short == "" { + short = actorFullID } - if msg.Scope != "" { - attributes["scope"] = msg.Scope + action := strings.ToLower(ev.Action) + if action == "" { + action = ev.Action + } + + eventType := strings.ToLower(ev.Type) + if eventType == "" { + eventType = ev.Type } return aggregatedEvent{ Timestamp: ts, - Type: string(msg.Type), - Action: string(msg.Action), - ActorID: shortID(msg.Actor.ID), - ActorName: attributes["name"], - ActorFullID: msg.Actor.ID, + Type: eventType, + Action: action, + ActorID: shortID(short), + ActorName: actorName, + ActorFullID: actorFullID, Attributes: attributes, } } @@ -227,26 +239,22 @@ func mergedEventAttributes(ev aggregatedEvent) map[string]string { return attrs } -func streamDockerEvents( +func forwardRuntimeEvents( ctx context.Context, - client *dockerClient.Client, runtime clabruntime.ContainerRuntime, registry *netlinkRegistry, + runtimeEvents <-chan clabruntime.ContainerEvent, + runtimeErrs <-chan error, eventSink chan<- aggregatedEvent, errSink chan<- error, ) { - filtersArgs := filters.NewArgs() - filtersArgs.Add("label", clabconstants.Containerlab) - - messages, errs := client.Events(ctx, dockerTypes.EventsOptions{Filters: filtersArgs}) - for { select { case <-ctx.Done(): errSink <- nil return - case err, ok := <-errs: + case err, ok := <-runtimeErrs: if !ok { errSink <- nil @@ -258,15 +266,24 @@ func streamDockerEvents( return } - case msg, ok := <-messages: + case ev, ok := <-runtimeEvents: if !ok { errSink <- nil return } - registry.HandleDockerMessage(ctx, runtime, msg) - eventSink <- dockerMessageToEvent(msg) + registry.HandleContainerEvent(ctx, runtime, ev) + + aggregated := aggregatedEventFromContainerEvent(ev) + + select { + case eventSink <- aggregated: + case <-ctx.Done(): + errSink <- nil + + return + } } } } @@ -352,25 +369,27 @@ func (r *netlinkRegistry) remove(id string, watcher *netlinkWatcher) { } } -func (r *netlinkRegistry) HandleDockerMessage( +func (r *netlinkRegistry) HandleContainerEvent( ctx context.Context, runtime clabruntime.ContainerRuntime, - msg dockerEvents.Message, + ev clabruntime.ContainerEvent, ) { - if msg.Type != dockerEvents.ContainerEventType { + if !strings.EqualFold(ev.Type, clabruntime.EventTypeContainer) { return } - switch msg.Action { - case dockerEvents.ActionStart, dockerEvents.ActionUnPause, dockerEvents.ActionRestart: - container := containerFromDockerMessage(runtime, msg) + action := strings.ToLower(ev.Action) + + switch action { + case clabruntime.EventActionStart, clabruntime.EventActionUnpause, clabruntime.EventActionRestart: + container := containerFromEvent(runtime, ev) if container != nil { r.Start(ctx, container) } - case dockerEvents.ActionDie, dockerEvents.ActionStop, dockerEvents.ActionDestroy, dockerEvents.ActionKill: - id := msg.Actor.ID + case clabruntime.EventActionDie, clabruntime.EventActionStop, clabruntime.EventActionDestroy, clabruntime.EventActionKill: + id := ev.ActorFullID if id == "" { - id = msg.ID + id = ev.ActorID } r.Stop(id) @@ -674,34 +693,54 @@ func copyStringMap(input map[string]string) map[string]string { return result } -func containerFromDockerMessage( +func containerFromEvent( runtime clabruntime.ContainerRuntime, - msg dockerEvents.Message, + ev clabruntime.ContainerEvent, ) *clabruntime.GenericContainer { - name := msg.Actor.Attributes["name"] - if msg.Actor.ID == "" && name == "" { + attributes := ev.Attributes + + name := ev.ActorName + if name == "" && attributes != nil { + name = attributes["name"] + } + + id := ev.ActorFullID + if id == "" { + id = ev.ActorID + } + + if id == "" && name == "" { return nil } + short := ev.ActorID + if short == "" { + short = id + } + container := &clabruntime.GenericContainer{ - Names: []string{name}, - ID: msg.Actor.ID, - ShortID: shortID(msg.Actor.ID), - Labels: map[string]string{}, + ID: id, + ShortID: shortID(short), } - if lab, ok := msg.Actor.Attributes[clabconstants.Containerlab]; ok && lab != "" { - container.Labels[clabconstants.Containerlab] = lab + if name != "" { + container.Names = []string{name} } - if runtime != nil { - container.SetRuntime(runtime) + if attributes != nil { + if lab := attributes[clabconstants.Containerlab]; lab != "" { + container.Labels = map[string]string{clabconstants.Containerlab: lab} + } } if container.ShortID == "" { container.ShortID = shortID(container.ID) } + if runtime != nil { + container.SetRuntime(runtime) + } + return container } diff --git a/runtime/docker/events.go b/runtime/docker/events.go new file mode 100644 index 000000000..636a0171e --- /dev/null +++ b/runtime/docker/events.go @@ -0,0 +1,79 @@ +package docker + +import ( + "context" + "errors" + "fmt" + + dockerTypes "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/filters" + + clabruntime "github.com/srl-labs/containerlab/runtime" + clabutils "github.com/srl-labs/containerlab/utils" +) + +func (d *DockerRuntime) StreamEvents( + ctx context.Context, + opts clabruntime.EventStreamOptions, +) (<-chan clabruntime.ContainerEvent, <-chan error, error) { + events := make(chan clabruntime.ContainerEvent, 128) + errs := make(chan error, 1) + + go d.streamDockerEvents(ctx, opts, events, errs) + + return events, errs, nil +} + +func (d *DockerRuntime) streamDockerEvents( + ctx context.Context, + opts clabruntime.EventStreamOptions, + eventSink chan<- clabruntime.ContainerEvent, + errSink chan<- error, +) { + defer close(eventSink) + defer close(errSink) + + filtersArgs := filters.NewArgs() + for key, value := range opts.Labels { + if value == "" { + filtersArgs.Add("label", key) + } else { + filtersArgs.Add("label", fmt.Sprintf("%s=%s", key, value)) + } + } + + messages, errs := d.Client.Events(ctx, dockerTypes.EventsOptions{Filters: filtersArgs}) + + for { + select { + case <-ctx.Done(): + return + case err, ok := <-errs: + if !ok { + return + } + + if err != nil && !errors.Is(err, context.Canceled) { + errSink <- err + + return + } + case msg, ok := <-messages: + if !ok { + return + } + + eventData := clabutils.DockerMessageToEventData(msg) + + eventSink <- clabruntime.ContainerEvent{ + Timestamp: eventData.Timestamp, + Type: eventData.Type, + Action: eventData.Action, + ActorID: eventData.ActorID, + ActorName: eventData.ActorName, + ActorFullID: eventData.ActorFullID, + Attributes: eventData.Attributes, + } + } + } +} diff --git a/runtime/ignite/ignite.go b/runtime/ignite/ignite.go index c441ee2bb..218c53224 100644 --- a/runtime/ignite/ignite.go +++ b/runtime/ignite/ignite.go @@ -523,3 +523,10 @@ func (*IgniteRuntime) GetCooCBindMounts() clabtypes.Binds { func (*IgniteRuntime) StreamLogs(ctx context.Context, containerName string) (io.ReadCloser, error) { return nil, fmt.Errorf("StreamLogs not implemented for Ignite runtime") } + +func (*IgniteRuntime) StreamEvents( + context.Context, + clabruntime.EventStreamOptions, +) (<-chan clabruntime.ContainerEvent, <-chan error, error) { + return nil, nil, fmt.Errorf("StreamEvents is not implemented for Ignite runtime") +} diff --git a/runtime/podman/podman.go b/runtime/podman/podman.go index 1f376ea81..e1a088b95 100644 --- a/runtime/podman/podman.go +++ b/runtime/podman/podman.go @@ -494,3 +494,10 @@ func (r *PodmanRuntime) GetRuntimeSocket() (string, error) { func (*PodmanRuntime) StreamLogs(ctx context.Context, containerName string) (io.ReadCloser, error) { return nil, fmt.Errorf("StreamLogs not implemented for Podman runtime") } + +func (*PodmanRuntime) StreamEvents( + context.Context, + runtime.EventStreamOptions, +) (<-chan runtime.ContainerEvent, <-chan error, error) { + return nil, nil, fmt.Errorf("StreamEvents is not implemented for Podman runtime") +} diff --git a/runtime/runtime.go b/runtime/runtime.go index 5758782ed..b56eded9e 100644 --- a/runtime/runtime.go +++ b/runtime/runtime.go @@ -78,6 +78,8 @@ type ContainerRuntime interface { // StreamLogs returns a reader for the container's logs // The caller needs to close the returned ReadCloser. StreamLogs(ctx context.Context, containerName string) (io.ReadCloser, error) + // StreamEvents streams runtime events that match provided options. + StreamEvents(ctx context.Context, opts EventStreamOptions) (<-chan ContainerEvent, <-chan error, error) } type ContainerStatus string @@ -88,6 +90,34 @@ const ( Stopped = "Stopped" ) +const ( + EventTypeContainer = "container" +) + +const ( + EventActionStart = "start" + EventActionUnpause = "unpause" + EventActionRestart = "restart" + EventActionDie = "die" + EventActionStop = "stop" + EventActionDestroy = "destroy" + EventActionKill = "kill" +) + +type EventStreamOptions struct { + Labels map[string]string +} + +type ContainerEvent struct { + Timestamp time.Time + Type string + Action string + ActorID string + ActorName string + ActorFullID string + Attributes map[string]string +} + type Initializer func() ContainerRuntime type RuntimeOption func(ContainerRuntime) diff --git a/utils/events.go b/utils/events.go new file mode 100644 index 000000000..6f9701491 --- /dev/null +++ b/utils/events.go @@ -0,0 +1,52 @@ +package utils + +import ( + "time" + + dockerEvents "github.com/docker/docker/api/types/events" +) + +// DockerEventData captures container-related information from a Docker event message. +type DockerEventData struct { + Timestamp time.Time + Type string + Action string + ActorID string + ActorName string + ActorFullID string + Attributes map[string]string +} + +// DockerMessageToEventData normalizes a Docker event message into DockerEventData. +func DockerMessageToEventData(msg dockerEvents.Message) DockerEventData { + ts := time.Unix(0, msg.TimeNano) + if ts.IsZero() { + ts = time.Unix(msg.Time, 0) + } + if ts.IsZero() { + ts = time.Now() + } + + attributes := make(map[string]string, len(msg.Actor.Attributes)+1) + for k, v := range msg.Actor.Attributes { + if v == "" { + continue + } + + attributes[k] = v + } + + if msg.Scope != "" { + attributes["scope"] = msg.Scope + } + + return DockerEventData{ + Timestamp: ts, + Type: string(msg.Type), + Action: string(msg.Action), + ActorID: msg.Actor.ID, + ActorName: attributes["name"], + ActorFullID: msg.Actor.ID, + Attributes: attributes, + } +} From 77642298b9ac46a92d4b2207acb344f7cd9544de Mon Sep 17 00:00:00 2001 From: Flosch62 Date: Sat, 18 Oct 2025 18:10:50 +0200 Subject: [PATCH 05/10] update mock --- mocks/mockruntime/runtime.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/mocks/mockruntime/runtime.go b/mocks/mockruntime/runtime.go index d408e3297..6a8c6be09 100644 --- a/mocks/mockruntime/runtime.go +++ b/mocks/mockruntime/runtime.go @@ -365,6 +365,22 @@ func (mr *MockContainerRuntimeMockRecorder) StopContainer(arg0, arg1 any) *gomoc return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StopContainer", reflect.TypeOf((*MockContainerRuntime)(nil).StopContainer), arg0, arg1) } +// StreamEvents mocks base method. +func (m *MockContainerRuntime) StreamEvents(ctx context.Context, opts runtime.EventStreamOptions) (<-chan runtime.ContainerEvent, <-chan error, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "StreamEvents", ctx, opts) + ret0, _ := ret[0].(<-chan runtime.ContainerEvent) + ret1, _ := ret[1].(<-chan error) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// StreamEvents indicates an expected call of StreamEvents. +func (mr *MockContainerRuntimeMockRecorder) StreamEvents(ctx, opts any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StreamEvents", reflect.TypeOf((*MockContainerRuntime)(nil).StreamEvents), ctx, opts) +} + // StreamLogs mocks base method. func (m *MockContainerRuntime) StreamLogs(ctx context.Context, containerName string) (io.ReadCloser, error) { m.ctrl.T.Helper() From 1f44381dd6e2557e7605dd4a674e6efb3c7532e0 Mon Sep 17 00:00:00 2001 From: Flosch62 Date: Sun, 19 Oct 2025 10:12:30 +0200 Subject: [PATCH 06/10] Refactor to core/events --- cmd/events.go | 18 +- core/events/formatter.go | 108 +++++ clab/events.go => core/events/netlink.go | 513 +++++------------------ core/events/options.go | 22 + core/events/stream.go | 202 +++++++++ core/events/types.go | 13 + utils/events.go | 4 - 7 files changed, 473 insertions(+), 407 deletions(-) create mode 100644 core/events/formatter.go rename clab/events.go => core/events/netlink.go (59%) create mode 100644 core/events/options.go create mode 100644 core/events/stream.go create mode 100644 core/events/types.go diff --git a/cmd/events.go b/cmd/events.go index 5ee6a30bc..f52006298 100644 --- a/cmd/events.go +++ b/cmd/events.go @@ -2,7 +2,8 @@ package cmd import ( "github.com/spf13/cobra" - clabevents "github.com/srl-labs/containerlab/clab" + clabevents "github.com/srl-labs/containerlab/core/events" + clabutils "github.com/srl-labs/containerlab/utils" ) func eventsCmd(o *Options) (*cobra.Command, error) { @@ -11,6 +12,10 @@ func eventsCmd(o *Options) (*cobra.Command, error) { Short: "stream lab lifecycle and interface events", Long: "stream docker runtime events as well as container interface updates for all running labs\n" + "reference: https://containerlab.dev/cmd/events/", + Aliases: []string{"ev"}, + PreRunE: func(*cobra.Command, []string) error { + return clabutils.CheckAndGetRootPrivs() + }, RunE: func(cmd *cobra.Command, _ []string) error { return eventsFn(cmd, o) }, @@ -24,15 +29,22 @@ func eventsCmd(o *Options) (*cobra.Command, error) { "output format. One of [plain, json]", ) + c.Example = `# Stream container and interface events in plain text +containerlab events + +# Stream events as JSON +containerlab events --format json` + return c, nil } func eventsFn(cmd *cobra.Command, o *Options) error { - opts := clabevents.EventsOptions{ + opts := clabevents.Options{ Format: o.Events.Format, Runtime: o.Global.Runtime, ClabOptions: o.ToClabOptions(), + Writer: cmd.OutOrStdout(), } - return clabevents.Events(cmd.Context(), opts) + return clabevents.Stream(cmd.Context(), opts) } diff --git a/core/events/formatter.go b/core/events/formatter.go new file mode 100644 index 000000000..9620f16b9 --- /dev/null +++ b/core/events/formatter.go @@ -0,0 +1,108 @@ +package events + +import ( + "encoding/json" + "fmt" + "io" + "sort" + "strings" + "time" +) + +type formatter func(aggregatedEvent) error + +func newFormatter(format string, w io.Writer) (formatter, error) { + normalized := strings.TrimSpace(strings.ToLower(format)) + if normalized == "" { + normalized = "plain" + } + + switch normalized { + case "plain": + return plainFormatter(w), nil + case "json": + return jsonFormatter(w), nil + default: + return nil, fmt.Errorf("output format %q is not supported, use 'plain' or 'json'", format) + } +} + +func plainFormatter(w io.Writer) formatter { + return func(ev aggregatedEvent) error { + ts := ev.Timestamp + if ts.IsZero() { + ts = time.Now() + } + ts = ts.UTC() + + actor := ev.ActorID + if actor == "" { + actor = ev.ActorName + } + if actor == "" { + actor = "-" + } + + attrs := mergedEventAttributes(ev) + keys := make([]string, 0, len(attrs)) + for k := range attrs { + keys = append(keys, k) + } + sort.Strings(keys) + + parts := make([]string, 0, len(keys)) + for _, k := range keys { + parts = append(parts, fmt.Sprintf("%s=%s", k, attrs[k])) + } + + suffix := "" + if len(parts) > 0 { + suffix = " (" + strings.Join(parts, ", ") + ")" + } + + _, err := fmt.Fprintf(w, "%s %s %s %s%s\n", ts.Format(time.RFC3339Nano), ev.Type, ev.Action, actor, suffix) + + return err + } +} + +func jsonFormatter(w io.Writer) formatter { + encoder := json.NewEncoder(w) + encoder.SetEscapeHTML(false) + + return func(ev aggregatedEvent) error { + copy := ev + copy.Attributes = mergedEventAttributes(ev) + + return encoder.Encode(copy) + } +} + +func mergedEventAttributes(ev aggregatedEvent) map[string]string { + if len(ev.Attributes) == 0 && ev.ActorName == "" && ev.ActorFullID == "" { + return nil + } + + attrs := make(map[string]string, len(ev.Attributes)+2) + for k, v := range ev.Attributes { + if v == "" { + continue + } + + attrs[k] = v + } + + if ev.ActorName != "" { + attrs["name"] = ev.ActorName + } + + if ev.ActorFullID != "" { + attrs["id"] = ev.ActorFullID + } + + if len(attrs) == 0 { + return nil + } + + return attrs +} diff --git a/clab/events.go b/core/events/netlink.go similarity index 59% rename from clab/events.go rename to core/events/netlink.go index 738ceece5..d0e29ff2b 100644 --- a/clab/events.go +++ b/core/events/netlink.go @@ -1,11 +1,8 @@ -package clab +package events import ( "context" - "encoding/json" - "errors" "fmt" - "sort" "strconv" "strings" "sync" @@ -13,295 +10,28 @@ import ( "github.com/charmbracelet/log" clabconstants "github.com/srl-labs/containerlab/constants" - clabcore "github.com/srl-labs/containerlab/core" clabruntime "github.com/srl-labs/containerlab/runtime" - clabutils "github.com/srl-labs/containerlab/utils" "github.com/vishvananda/netlink" "github.com/vishvananda/netns" "golang.org/x/sys/unix" ) -type EventsOptions struct { - Format string - Runtime string - ClabOptions []clabcore.ClabOption -} - -func Events(ctx context.Context, opts EventsOptions) error { //nolint:cyclop,funlen - if err := clabutils.CheckAndGetRootPrivs(); err != nil { - return err - } - - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - c, err := clabcore.NewContainerLab(opts.ClabOptions...) - if err != nil { - return err - } - - runtime, ok := c.Runtimes[opts.Runtime] - if !ok { - return fmt.Errorf("runtime %q is not initialized", opts.Runtime) - } - - format := strings.TrimSpace(strings.ToLower(opts.Format)) - if format == "" { - format = "plain" - } - - var printer func(aggregatedEvent) - switch format { - case "plain": - printer = printAggregatedEvent - case "json": - printer = printAggregatedEventJSON - default: - return fmt.Errorf("output format %q is not supported, use 'plain' or 'json'", opts.Format) - } - - eventCh := make(chan aggregatedEvent, 128) - errCh := make(chan error, 1) - registry := newNetlinkRegistry(eventCh) - - containers, err := c.ListContainers(ctx, clabcore.WithListclabLabelExists()) - if err != nil { - return fmt.Errorf("failed to list containers: %w", err) - } - - for idx := range containers { - container := containers[idx] - if !isRunningContainer(&container) { - continue - } - - registry.Start(ctx, &container) - } - - streamOpts := clabruntime.EventStreamOptions{ - Labels: map[string]string{ - clabconstants.Containerlab: "", - }, - } - - runtimeEvents, runtimeErrs, err := runtime.StreamEvents(ctx, streamOpts) - if err != nil { - return fmt.Errorf("failed to stream events for runtime %q: %w", opts.Runtime, err) - } - - go forwardRuntimeEvents(ctx, runtime, registry, runtimeEvents, runtimeErrs, eventCh, errCh) - - for { - select { - case ev := <-eventCh: - printer(ev) - case err := <-errCh: - if err == nil || errors.Is(err, context.Canceled) { - return nil - } - - return err - case <-ctx.Done(): - return nil - } - } -} - -type aggregatedEvent struct { - Timestamp time.Time `json:"timestamp"` - Type string `json:"type"` - Action string `json:"action"` - ActorID string `json:"actor_id"` - ActorName string `json:"actor_name"` - ActorFullID string `json:"actor_full_id"` - Attributes map[string]string `json:"attributes,omitempty"` -} - -func aggregatedEventFromContainerEvent(ev clabruntime.ContainerEvent) aggregatedEvent { - ts := ev.Timestamp - if ts.IsZero() { - ts = time.Now() - } - - attributes := copyStringMap(ev.Attributes) - - actorFullID := ev.ActorFullID - if actorFullID == "" { - actorFullID = ev.ActorID - } - - actorName := ev.ActorName - if actorName == "" && attributes != nil { - actorName = attributes["name"] - } - - short := ev.ActorID - if short == "" { - short = actorFullID - } - - action := strings.ToLower(ev.Action) - if action == "" { - action = ev.Action - } - - eventType := strings.ToLower(ev.Type) - if eventType == "" { - eventType = ev.Type - } - - return aggregatedEvent{ - Timestamp: ts, - Type: eventType, - Action: action, - ActorID: shortID(short), - ActorName: actorName, - ActorFullID: actorFullID, - Attributes: attributes, - } -} - -func printAggregatedEvent(ev aggregatedEvent) { - ts := ev.Timestamp - if ts.IsZero() { - ts = time.Now() - } - ts = ts.UTC() - - actor := ev.ActorID - if actor == "" { - actor = ev.ActorName - } - if actor == "" { - actor = "-" - } - - attrs := mergedEventAttributes(ev) - keys := make([]string, 0, len(attrs)) - for k := range attrs { - keys = append(keys, k) - } - sort.Strings(keys) - - attrParts := make([]string, 0, len(keys)) - for _, k := range keys { - attrParts = append(attrParts, fmt.Sprintf("%s=%s", k, attrs[k])) - } - - suffix := "" - if len(attrParts) > 0 { - suffix = " (" + strings.Join(attrParts, ", ") + ")" - } - - fmt.Printf("%s %s %s %s%s\n", ts.Format(time.RFC3339Nano), ev.Type, ev.Action, actor, suffix) -} - -func printAggregatedEventJSON(ev aggregatedEvent) { - evCopy := ev - evCopy.Attributes = mergedEventAttributes(ev) - - b, err := json.Marshal(evCopy) - if err != nil { - log.Debugf("failed to marshal event to json: %v", err) - - return - } - - fmt.Println(string(b)) -} - -func mergedEventAttributes(ev aggregatedEvent) map[string]string { - if len(ev.Attributes) == 0 && ev.ActorName == "" && ev.ActorFullID == "" { - return nil - } - - attrs := make(map[string]string, len(ev.Attributes)+2) - for k, v := range ev.Attributes { - if v == "" { - continue - } - - attrs[k] = v - } - - if ev.ActorName != "" { - attrs["name"] = ev.ActorName - } - - if ev.ActorFullID != "" { - attrs["id"] = ev.ActorFullID - } - - if len(attrs) == 0 { - return nil - } - - return attrs -} - -func forwardRuntimeEvents( - ctx context.Context, - runtime clabruntime.ContainerRuntime, - registry *netlinkRegistry, - runtimeEvents <-chan clabruntime.ContainerEvent, - runtimeErrs <-chan error, - eventSink chan<- aggregatedEvent, - errSink chan<- error, -) { - for { - select { - case <-ctx.Done(): - errSink <- nil - - return - case err, ok := <-runtimeErrs: - if !ok { - errSink <- nil - - return - } - - if err != nil && !errors.Is(err, context.Canceled) { - errSink <- err - - return - } - case ev, ok := <-runtimeEvents: - if !ok { - errSink <- nil - - return - } - - registry.HandleContainerEvent(ctx, runtime, ev) - - aggregated := aggregatedEventFromContainerEvent(ev) - - select { - case eventSink <- aggregated: - case <-ctx.Done(): - errSink <- nil - - return - } - } - } -} - type netlinkRegistry struct { + ctx context.Context mu sync.Mutex watchers map[string]*netlinkWatcher events chan<- aggregatedEvent } -func newNetlinkRegistry(events chan<- aggregatedEvent) *netlinkRegistry { +func newNetlinkRegistry(ctx context.Context, events chan<- aggregatedEvent) *netlinkRegistry { return &netlinkRegistry{ + ctx: ctx, watchers: make(map[string]*netlinkWatcher), events: events, } } -func (r *netlinkRegistry) Start(ctx context.Context, container *clabruntime.GenericContainer) { +func (r *netlinkRegistry) Start(container *clabruntime.GenericContainer) { clone := cloneContainer(container) if clone == nil { return @@ -323,7 +53,7 @@ func (r *netlinkRegistry) Start(ctx context.Context, container *clabruntime.Gene return } - watcherCtx, cancel := context.WithCancel(ctx) + watcherCtx, cancel := context.WithCancel(r.ctx) watcher := &netlinkWatcher{ container: clone, cancel: cancel, @@ -370,7 +100,6 @@ func (r *netlinkRegistry) remove(id string, watcher *netlinkWatcher) { } func (r *netlinkRegistry) HandleContainerEvent( - ctx context.Context, runtime clabruntime.ContainerRuntime, ev clabruntime.ContainerEvent, ) { @@ -384,7 +113,7 @@ func (r *netlinkRegistry) HandleContainerEvent( case clabruntime.EventActionStart, clabruntime.EventActionUnpause, clabruntime.EventActionRestart: container := containerFromEvent(runtime, ev) if container != nil { - r.Start(ctx, container) + r.Start(container) } case clabruntime.EventActionDie, clabruntime.EventActionStop, clabruntime.EventActionDestroy, clabruntime.EventActionKill: id := ev.ActorFullID @@ -396,6 +125,82 @@ func (r *netlinkRegistry) HandleContainerEvent( } } +func cloneContainer(container *clabruntime.GenericContainer) *clabruntime.GenericContainer { + if container == nil { + return nil + } + + clone := &clabruntime.GenericContainer{ + Names: append([]string{}, container.Names...), + ID: container.ID, + ShortID: container.ShortID, + Labels: cloneStringMap(container.Labels), + } + + if clone.ShortID == "" { + clone.ShortID = shortID(clone.ID) + } + + if container.Runtime == nil { + return nil + } + + clone.SetRuntime(container.Runtime) + + return clone +} + +func containerFromEvent( + runtime clabruntime.ContainerRuntime, + ev clabruntime.ContainerEvent, +) *clabruntime.GenericContainer { + attributes := ev.Attributes + + name := ev.ActorName + if name == "" && attributes != nil { + name = attributes["name"] + } + + id := ev.ActorFullID + if id == "" { + id = ev.ActorID + } + + if id == "" && name == "" { + return nil + } + + short := ev.ActorID + if short == "" { + short = id + } + + container := &clabruntime.GenericContainer{ + ID: id, + ShortID: shortID(short), + } + + if name != "" { + container.Names = []string{name} + } + + if attributes != nil { + if lab := attributes[clabconstants.Containerlab]; lab != "" { + container.Labels = map[string]string{clabconstants.Containerlab: lab} + } + } + + if container.ShortID == "" { + container.ShortID = shortID(container.ID) + } + + if runtime != nil { + container.SetRuntime(runtime) + } + + return container +} + type netlinkWatcher struct { container *clabruntime.GenericContainer cancel context.CancelFunc @@ -504,6 +309,30 @@ func (w *netlinkWatcher) processUpdate( } } +func firstContainerName(container *clabruntime.GenericContainer) string { + if container == nil || len(container.Names) == 0 { + return "" + } + + return container.Names[0] +} + +func containerLabel(container *clabruntime.GenericContainer) string { + if container == nil || container.Labels == nil { + return "" + } + + return container.Labels[clabconstants.Containerlab] +} + +func shortID(id string) string { + if len(id) > 12 { + return id[:12] + } + + return id +} + func (r *netlinkRegistry) emitInterfaceEvent( container *clabruntime.GenericContainer, action string, @@ -538,7 +367,7 @@ func (r *netlinkRegistry) emitInterfaceEvent( attributes["name"] = name } - r.events <- aggregatedEvent{ + event := aggregatedEvent{ Timestamp: time.Now(), Type: "interface", Action: action, @@ -547,6 +376,11 @@ func (r *netlinkRegistry) emitInterfaceEvent( ActorFullID: container.ID, Attributes: attributes, } + + select { + case r.events <- event: + case <-r.ctx.Done(): + } } func waitForNamespacePath( @@ -646,124 +480,3 @@ func (s ifaceSnapshot) equal(other ifaceSnapshot) bool { s.OperState == other.OperState && s.Type == other.Type } - -func isRunningContainer(container *clabruntime.GenericContainer) bool { - if container == nil { - return false - } - - return strings.EqualFold(container.State, "running") -} - -func cloneContainer(container *clabruntime.GenericContainer) *clabruntime.GenericContainer { - if container == nil { - return nil - } - - clone := &clabruntime.GenericContainer{ - Names: append([]string{}, container.Names...), - ID: container.ID, - ShortID: container.ShortID, - Labels: copyStringMap(container.Labels), - } - - if clone.ShortID == "" { - clone.ShortID = shortID(clone.ID) - } - - if container.Runtime == nil { - return nil - } - - clone.SetRuntime(container.Runtime) - - return clone -} - -func copyStringMap(input map[string]string) map[string]string { - if len(input) == 0 { - return nil - } - - result := make(map[string]string, len(input)) - for k, v := range input { - result[k] = v - } - - return result -} - -func containerFromEvent( - runtime clabruntime.ContainerRuntime, - ev clabruntime.ContainerEvent, -) *clabruntime.GenericContainer { - attributes := ev.Attributes - - name := ev.ActorName - if name == "" && attributes != nil { - name = attributes["name"] - } - - id := ev.ActorFullID - if id == "" { - id = ev.ActorID - } - - if id == "" && name == "" { - return nil - } - - short := ev.ActorID - if short == "" { - short = id - } - - container := &clabruntime.GenericContainer{ - ID: id, - ShortID: shortID(short), - } - - if name != "" { - container.Names = []string{name} - } - - if attributes != nil { - if lab := attributes[clabconstants.Containerlab]; lab != "" { - container.Labels = map[string]string{clabconstants.Containerlab: lab} - } - } - - if container.ShortID == "" { - container.ShortID = shortID(container.ID) - } - - if runtime != nil { - container.SetRuntime(runtime) - } - - return container -} - -func firstContainerName(container *clabruntime.GenericContainer) string { - if container == nil || len(container.Names) == 0 { - return "" - } - - return container.Names[0] -} - -func containerLabel(container *clabruntime.GenericContainer) string { - if container == nil || container.Labels == nil { - return "" - } - - return container.Labels[clabconstants.Containerlab] -} - -func shortID(id string) string { - if len(id) > 12 { - return id[:12] - } - - return id -} diff --git a/core/events/options.go b/core/events/options.go new file mode 100644 index 000000000..f428a0c0e --- /dev/null +++ b/core/events/options.go @@ -0,0 +1,22 @@ +package events + +import ( + "io" + + clabcore "github.com/srl-labs/containerlab/core" +) + +type Options struct { + Format string + Runtime string + ClabOptions []clabcore.ClabOption + Writer io.Writer +} + +func (o Options) writer() io.Writer { + if o.Writer != nil { + return o.Writer + } + + return io.Discard +} diff --git a/core/events/stream.go b/core/events/stream.go new file mode 100644 index 000000000..e3846f70a --- /dev/null +++ b/core/events/stream.go @@ -0,0 +1,202 @@ +package events + +import ( + "context" + "errors" + "fmt" + "strings" + "time" + + "github.com/charmbracelet/log" + clabconstants "github.com/srl-labs/containerlab/constants" + clabcore "github.com/srl-labs/containerlab/core" + clabruntime "github.com/srl-labs/containerlab/runtime" +) + +func Stream(ctx context.Context, opts Options) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + clab, err := clabcore.NewContainerLab(opts.ClabOptions...) + if err != nil { + return err + } + + runtime, ok := clab.Runtimes[opts.Runtime] + if !ok { + return fmt.Errorf("runtime %q is not initialized", opts.Runtime) + } + + printer, err := newFormatter(opts.Format, opts.writer()) + if err != nil { + return err + } + + eventCh := make(chan aggregatedEvent, 128) + registry := newNetlinkRegistry(ctx, eventCh) + + containers, err := clab.ListContainers(ctx, clabcore.WithListclabLabelExists()) + if err != nil { + return fmt.Errorf("failed to list containers: %w", err) + } + + for idx := range containers { + container := containers[idx] + if !isRunningContainer(&container) { + continue + } + + registry.Start(&container) + } + + streamOpts := clabruntime.EventStreamOptions{ + Labels: map[string]string{ + clabconstants.Containerlab: "", + }, + } + + runtimeEvents, runtimeErrs, err := runtime.StreamEvents(ctx, streamOpts) + if err != nil { + return fmt.Errorf("failed to stream events for runtime %q: %w", opts.Runtime, err) + } + + errCh := make(chan error, 1) + go forwardRuntimeEvents(ctx, runtime, registry, runtimeEvents, runtimeErrs, eventCh, errCh) + + runtimeErrors := errCh + + for { + select { + case ev := <-eventCh: + if err := printer(ev); err != nil { + log.Debugf("failed to write event: %v", err) + } + case err, ok := <-runtimeErrors: + if !ok { + runtimeErrors = nil + + continue + } + + if err != nil && !errors.Is(err, context.Canceled) { + return err + } + case <-ctx.Done(): + return nil + } + } +} + +func forwardRuntimeEvents( + ctx context.Context, + runtime clabruntime.ContainerRuntime, + registry *netlinkRegistry, + runtimeEvents <-chan clabruntime.ContainerEvent, + runtimeErrs <-chan error, + eventSink chan<- aggregatedEvent, + errSink chan<- error, +) { + defer close(errSink) + + sendErr := func(err error) { + select { + case errSink <- err: + case <-ctx.Done(): + } + } + + for { + select { + case <-ctx.Done(): + return + case err, ok := <-runtimeErrs: + if !ok { + return + } + + if err != nil && !errors.Is(err, context.Canceled) { + sendErr(err) + + return + } + case ev, ok := <-runtimeEvents: + if !ok { + return + } + + registry.HandleContainerEvent(runtime, ev) + + aggregated := aggregatedEventFromContainerEvent(ev) + select { + case eventSink <- aggregated: + case <-ctx.Done(): + return + } + } + } +} + +func aggregatedEventFromContainerEvent(ev clabruntime.ContainerEvent) aggregatedEvent { + ts := ev.Timestamp + if ts.IsZero() { + ts = time.Now() + } + + attributes := cloneStringMap(ev.Attributes) + + actorFullID := ev.ActorFullID + if actorFullID == "" { + actorFullID = ev.ActorID + } + + actorName := ev.ActorName + if actorName == "" && attributes != nil { + actorName = attributes["name"] + } + + short := ev.ActorID + if short == "" { + short = actorFullID + } + + action := strings.ToLower(ev.Action) + if action == "" { + action = ev.Action + } + + eventType := strings.ToLower(ev.Type) + if eventType == "" { + eventType = ev.Type + } + + return aggregatedEvent{ + Timestamp: ts, + Type: eventType, + Action: action, + ActorID: shortID(short), + ActorName: actorName, + ActorFullID: actorFullID, + Attributes: attributes, + } +} + +func cloneStringMap(input map[string]string) map[string]string { + if len(input) == 0 { + return nil + } + + result := make(map[string]string, len(input)) + for k, v := range input { + result[k] = v + } + + return result +} + +func isRunningContainer(container *clabruntime.GenericContainer) bool { + if container == nil { + return false + } + + return strings.EqualFold(container.State, "running") +} diff --git a/core/events/types.go b/core/events/types.go new file mode 100644 index 000000000..f7b530a3c --- /dev/null +++ b/core/events/types.go @@ -0,0 +1,13 @@ +package events + +import "time" + +type aggregatedEvent struct { + Timestamp time.Time `json:"timestamp"` + Type string `json:"type"` + Action string `json:"action"` + ActorID string `json:"actor_id"` + ActorName string `json:"actor_name"` + ActorFullID string `json:"actor_full_id"` + Attributes map[string]string `json:"attributes,omitempty"` +} diff --git a/utils/events.go b/utils/events.go index 6f9701491..a501ab07b 100644 --- a/utils/events.go +++ b/utils/events.go @@ -29,10 +29,6 @@ func DockerMessageToEventData(msg dockerEvents.Message) DockerEventData { attributes := make(map[string]string, len(msg.Actor.Attributes)+1) for k, v := range msg.Actor.Attributes { - if v == "" { - continue - } - attributes[k] = v } From 8c4ba6dcc0b459a91ea845e947bbe05723791214 Mon Sep 17 00:00:00 2001 From: Flosch62 Date: Sun, 19 Oct 2025 10:49:30 +0200 Subject: [PATCH 07/10] docs --- cmd/events.go | 2 +- core/events/options.go | 1 + core/events/stream.go | 2 ++ docs/cmd/events.md | 26 +++++++++++++++----------- 4 files changed, 19 insertions(+), 12 deletions(-) diff --git a/cmd/events.go b/cmd/events.go index f52006298..8bae61e4a 100644 --- a/cmd/events.go +++ b/cmd/events.go @@ -10,7 +10,7 @@ func eventsCmd(o *Options) (*cobra.Command, error) { c := &cobra.Command{ Use: "events", Short: "stream lab lifecycle and interface events", - Long: "stream docker runtime events as well as container interface updates for all running labs\n" + + Long: "stream container runtime events and interface updates for all running labs using the selected runtime\n" + "reference: https://containerlab.dev/cmd/events/", Aliases: []string{"ev"}, PreRunE: func(*cobra.Command, []string) error { diff --git a/core/events/options.go b/core/events/options.go index f428a0c0e..3bf49acc0 100644 --- a/core/events/options.go +++ b/core/events/options.go @@ -6,6 +6,7 @@ import ( clabcore "github.com/srl-labs/containerlab/core" ) +// Options configure how runtime and interface events are sourced and rendered. type Options struct { Format string Runtime string diff --git a/core/events/stream.go b/core/events/stream.go index e3846f70a..1b8f0b3a3 100644 --- a/core/events/stream.go +++ b/core/events/stream.go @@ -13,6 +13,8 @@ import ( clabruntime "github.com/srl-labs/containerlab/runtime" ) +// Stream subscribes to the selected runtime and netlink sources and forwards +// aggregated events to the configured writer. func Stream(ctx context.Context, opts Options) error { ctx, cancel := context.WithCancel(ctx) defer cancel() diff --git a/docs/cmd/events.md b/docs/cmd/events.md index 048514d49..4326dae59 100644 --- a/docs/cmd/events.md +++ b/docs/cmd/events.md @@ -1,17 +1,18 @@ # events command -## Description +### Description -The `events` command streams lifecycle updates for every Containerlab resource and augments them with interface change notifications collected from the container network namespaces. The output combines Docker's event feed with the netlink information that powers `containerlab inspect interfaces`, so you can observe container activity and interface state changes in real time without selecting a specific lab. +The `events` command streams lifecycle updates for every Containerlab resource and augments them with interface change notifications collected from the container network namespaces. The output combines the selected runtime's event feed (for example Docker) with the netlink information that powers `containerlab inspect interfaces`, so you can observe container activity and interface state changes in real time without selecting a specific lab. +### Usage -## Usage +`containerlab [global-flags] events [local-flags]` -`containerlab [global-flags] events` +**aliases:** `ev` The command respects the global flags such as `--runtime`, `--debug`, or `--log-level`. It adds a local `--format` option that controls the output representation. When invoked with no arguments it discovers all running labs and immediately begins streaming events; new labs that start after the command begins are picked up automatically. -## Event format +### Event format In the default `plain` format every line mirrors the `docker events` format: @@ -19,15 +20,15 @@ In the default `plain` format every line mirrors the `docker events` format: (=, ...) ``` -* **Docker events** show the short container ID as the actor and include the original Docker attributes (for example `image`, `name`, `containerlab`, `scope`, …). +* **Runtime events** show the short container ID as the actor and include the original attributes supplied by the container runtime (for example `image`, `name`, `containerlab`, `scope`, …). * **Interface events** use `type` `interface` and `origin=netlink` in the attribute list. They also report interface-specific data such as `ifname`, `state`, `mtu`, `mac`, `type`, `alias`, and the lab label. The actor is still the container short ID, and the container name is supplied in the attributes (`name=...`). * Interface notifications are emitted when a link appears, disappears, or when its relevant properties (operational state, MTU, alias, MAC address, type) change. When `--format json` is used, each event becomes a single JSON object on its own line. The fields match the plain output (`timestamp`, `type`, `action`, `actor_id`, `actor_name`, `actor_full_id`) and include an `attributes` map with the same key/value pairs that the plain formatter prints. -## Examples +### Examples -### Watch an existing lab and new deployments +#### Watch an existing lab and new deployments ``` $ sudo containerlab events @@ -40,11 +41,14 @@ $ sudo containerlab events The stream contains all currently running labs and stays active to capture subsequent deployments, restarts, or interface adjustments. -### Use with alternative runtimes +#### Use with alternative runtimes -The command currently supports the Docker runtime. When `--runtime` selects another backend (for example Podman) the command exits with an explanatory error so that you can adjust the configuration or fall back to Docker. +Containerlab streams events from the runtime selected via the global `--runtime` flag. -## See also +> **Currently supported runtime:** `docker` +> Runtimes that do not implement the `events` API (or are not yet supported by Containerlab) will exit with an explanatory error. + +### See also * [`inspect interfaces`](inspect/interfaces.md) – produces a point-in-time view of the same interface details that `events` reports continuously. * `docker events` – the raw runtime feed that Containerlab builds upon. From 71dba693b617ec3d5dbe505b22c89d4461341978 Mon Sep 17 00:00:00 2001 From: Flosch62 Date: Sun, 19 Oct 2025 11:04:21 +0200 Subject: [PATCH 08/10] added --initial-state --- cmd/events.go | 17 ++++-- cmd/options.go | 3 +- core/events/netlink.go | 38 ++++++++----- core/events/options.go | 9 ++-- core/events/stream.go | 99 +++++++++++++++++++++++++++++++++- docs/cmd/events.md | 22 ++++++-- tests/01-smoke/26-events.robot | 26 ++++++++- 7 files changed, 186 insertions(+), 28 deletions(-) diff --git a/cmd/events.go b/cmd/events.go index 8bae61e4a..0330d71da 100644 --- a/cmd/events.go +++ b/cmd/events.go @@ -29,6 +29,14 @@ func eventsCmd(o *Options) (*cobra.Command, error) { "output format. One of [plain, json]", ) + c.Flags().BoolVarP( + &o.Events.IncludeInitialState, + "initial-state", + "i", + o.Events.IncludeInitialState, + "emit the current container and interface states before streaming new events", + ) + c.Example = `# Stream container and interface events in plain text containerlab events @@ -40,10 +48,11 @@ containerlab events --format json` func eventsFn(cmd *cobra.Command, o *Options) error { opts := clabevents.Options{ - Format: o.Events.Format, - Runtime: o.Global.Runtime, - ClabOptions: o.ToClabOptions(), - Writer: cmd.OutOrStdout(), + Format: o.Events.Format, + Runtime: o.Global.Runtime, + IncludeInitialState: o.Events.IncludeInitialState, + ClabOptions: o.ToClabOptions(), + Writer: cmd.OutOrStdout(), } return clabevents.Stream(cmd.Context(), opts) diff --git a/cmd/options.go b/cmd/options.go index 3b2153f87..6f7dba54c 100644 --- a/cmd/options.go +++ b/cmd/options.go @@ -358,7 +358,8 @@ type GraphOptions struct { } type EventsOptions struct { - Format string + Format string + IncludeInitialState bool } type ToolsApiOptions struct { diff --git a/core/events/netlink.go b/core/events/netlink.go index d0e29ff2b..c9029cd91 100644 --- a/core/events/netlink.go +++ b/core/events/netlink.go @@ -17,17 +17,19 @@ import ( ) type netlinkRegistry struct { - ctx context.Context - mu sync.Mutex - watchers map[string]*netlinkWatcher - events chan<- aggregatedEvent + ctx context.Context + mu sync.Mutex + watchers map[string]*netlinkWatcher + events chan<- aggregatedEvent + includeInitialSnapshot bool } -func newNetlinkRegistry(ctx context.Context, events chan<- aggregatedEvent) *netlinkRegistry { +func newNetlinkRegistry(ctx context.Context, events chan<- aggregatedEvent, includeInitialSnapshot bool) *netlinkRegistry { return &netlinkRegistry{ - ctx: ctx, - watchers: make(map[string]*netlinkWatcher), - events: events, + ctx: ctx, + watchers: make(map[string]*netlinkWatcher), + events: events, + includeInitialSnapshot: includeInitialSnapshot, } } @@ -55,9 +57,10 @@ func (r *netlinkRegistry) Start(container *clabruntime.GenericContainer) { watcherCtx, cancel := context.WithCancel(r.ctx) watcher := &netlinkWatcher{ - container: clone, - cancel: cancel, - done: make(chan struct{}), + container: clone, + cancel: cancel, + done: make(chan struct{}), + includeSnapshot: r.includeInitialSnapshot, } r.watchers[id] = watcher @@ -202,9 +205,10 @@ func containerFromEvent( } type netlinkWatcher struct { - container *clabruntime.GenericContainer - cancel context.CancelFunc - done chan struct{} + container *clabruntime.GenericContainer + cancel context.CancelFunc + done chan struct{} + includeSnapshot bool } func (w *netlinkWatcher) run(ctx context.Context, registry *netlinkRegistry) { @@ -243,6 +247,12 @@ func (w *netlinkWatcher) run(ctx context.Context, registry *netlinkRegistry) { states = make(map[int]ifaceSnapshot) } + if w.includeSnapshot { + for _, snapshot := range states { + registry.emitInterfaceEvent(w.container, "snapshot", snapshot) + } + } + updates := make(chan netlink.LinkUpdate, 32) done := make(chan struct{}) opts := netlink.LinkSubscribeOptions{Namespace: &nsHandle} diff --git a/core/events/options.go b/core/events/options.go index 3bf49acc0..d36f06488 100644 --- a/core/events/options.go +++ b/core/events/options.go @@ -8,10 +8,11 @@ import ( // Options configure how runtime and interface events are sourced and rendered. type Options struct { - Format string - Runtime string - ClabOptions []clabcore.ClabOption - Writer io.Writer + Format string + Runtime string + IncludeInitialState bool + ClabOptions []clabcore.ClabOption + Writer io.Writer } func (o Options) writer() io.Writer { diff --git a/core/events/stream.go b/core/events/stream.go index 1b8f0b3a3..ef8abe315 100644 --- a/core/events/stream.go +++ b/core/events/stream.go @@ -35,13 +35,17 @@ func Stream(ctx context.Context, opts Options) error { } eventCh := make(chan aggregatedEvent, 128) - registry := newNetlinkRegistry(ctx, eventCh) + registry := newNetlinkRegistry(ctx, eventCh, opts.IncludeInitialState) containers, err := clab.ListContainers(ctx, clabcore.WithListclabLabelExists()) if err != nil { return fmt.Errorf("failed to list containers: %w", err) } + if opts.IncludeInitialState { + go emitContainerSnapshots(ctx, containers, eventCh) + } + for idx := range containers { container := containers[idx] if !isRunningContainer(&container) { @@ -182,6 +186,99 @@ func aggregatedEventFromContainerEvent(ev clabruntime.ContainerEvent) aggregated } } +func emitContainerSnapshots( + ctx context.Context, + containers []clabruntime.GenericContainer, + sink chan<- aggregatedEvent, +) { + for idx := range containers { + container := containers[idx] + if !isRunningContainer(&container) { + continue + } + + event := aggregatedEventFromContainerSnapshot(&container) + if event.ActorID == "" && event.ActorName == "" { + continue + } + + select { + case sink <- event: + case <-ctx.Done(): + return + } + } +} + +func aggregatedEventFromContainerSnapshot( + container *clabruntime.GenericContainer, +) aggregatedEvent { + if container == nil { + return aggregatedEvent{} + } + + state := strings.ToLower(container.State) + + short := container.ShortID + if short == "" { + short = shortID(container.ID) + } + + attributes := cloneStringMap(container.Labels) + if attributes == nil { + attributes = make(map[string]string) + } + + if _, ok := attributes["origin"]; !ok { + attributes["origin"] = "snapshot" + } + + if container.Image != "" { + attributes["image"] = container.Image + } + + if container.Status != "" { + attributes["status"] = container.Status + } + + if state != "" { + attributes["state"] = state + } + + if container.NetworkName != "" { + attributes["network"] = container.NetworkName + } + + if container.NetworkSettings.IPv4addr != "" { + attributes["mgmt_ipv4"] = container.GetContainerIPv4() + } + + if container.NetworkSettings.IPv6addr != "" { + attributes["mgmt_ipv6"] = container.GetContainerIPv6() + } + + if len(attributes) == 0 { + attributes = nil + } + + actorName := firstContainerName(container) + + action := state + if action == "" { + action = "snapshot" + } + + return aggregatedEvent{ + Timestamp: time.Now(), + Type: "container", + Action: action, + ActorID: shortID(short), + ActorName: actorName, + ActorFullID: container.ID, + Attributes: attributes, + } +} + func cloneStringMap(input map[string]string) map[string]string { if len(input) == 0 { return nil diff --git a/docs/cmd/events.md b/docs/cmd/events.md index 4326dae59..0cc6dd7ab 100644 --- a/docs/cmd/events.md +++ b/docs/cmd/events.md @@ -10,7 +10,12 @@ The `events` command streams lifecycle updates for every Containerlab resource a **aliases:** `ev` -The command respects the global flags such as `--runtime`, `--debug`, or `--log-level`. It adds a local `--format` option that controls the output representation. When invoked with no arguments it discovers all running labs and immediately begins streaming events; new labs that start after the command begins are picked up automatically. +The command respects the global flags such as `--runtime`, `--debug`, or `--log-level`. It adds local options: + +- `--format` controls the output representation (`plain`, `json`). +- `--initial-state` emits a snapshot of currently running containers and their interface states before following live updates. + +When invoked with no arguments it discovers all running labs and immediately begins streaming events; new labs that start after the command begins are picked up automatically. ### Event format @@ -20,9 +25,9 @@ In the default `plain` format every line mirrors the `docker events` format: (=, ...) ``` -* **Runtime events** show the short container ID as the actor and include the original attributes supplied by the container runtime (for example `image`, `name`, `containerlab`, `scope`, …). +* **Runtime events** show the short container ID as the actor and include the original attributes supplied by the container runtime (for example `image`, `name`, `containerlab`, `scope`, …). When `--initial-state` is enabled the stream starts with `container ` snapshots (for example `container running`) that carry an `origin=snapshot` attribute. * **Interface events** use `type` `interface` and `origin=netlink` in the attribute list. They also report interface-specific data such as `ifname`, `state`, `mtu`, `mac`, `type`, `alias`, and the lab label. The actor is still the container short ID, and the container name is supplied in the attributes (`name=...`). -* Interface notifications are emitted when a link appears, disappears, or when its relevant properties (operational state, MTU, alias, MAC address, type) change. +* Interface notifications are emitted when a link appears, disappears, or when its relevant properties (operational state, MTU, alias, MAC address, type) change. Initial snapshots use the `snapshot` action when `--initial-state` is requested. When `--format json` is used, each event becomes a single JSON object on its own line. The fields match the plain output (`timestamp`, `type`, `action`, `actor_id`, `actor_name`, `actor_full_id`) and include an `attributes` map with the same key/value pairs that the plain formatter prints. @@ -41,6 +46,17 @@ $ sudo containerlab events The stream contains all currently running labs and stays active to capture subsequent deployments, restarts, or interface adjustments. +#### Include existing resources in the stream + +``` +$ sudo containerlab events --initial-state +2024-07-01T11:02:55.912345000Z container running 5d0b5a9ad3f1 (containerlab=frr-lab, image=ghcr.io/srl-labs/frr, name=clab-frr-lab-frr01, origin=snapshot, state=running) +2024-07-01T11:02:55.912678000Z interface snapshot 5d0b5a9ad3f1 (ifname=eth0, index=22, lab=frr-lab, mac=02:42:ac:14:00:02, mtu=1500, name=clab-frr-lab-frr01, origin=netlink, state=up, type=veth) +… +``` + +This mode begins with a point-in-time view of every running container and interface before switching to live updates. + #### Use with alternative runtimes Containerlab streams events from the runtime selected via the global `--runtime` flag. diff --git a/tests/01-smoke/26-events.robot b/tests/01-smoke/26-events.robot index dd3207825..1033c1602 100644 --- a/tests/01-smoke/26-events.robot +++ b/tests/01-smoke/26-events.robot @@ -42,6 +42,29 @@ Events Command Streams Plain Output Remove File If Exists ${plain-err} END +Events Command Emits Initial State Snapshot + [Documentation] Verify that enabling --initial-state emits running containers and their interfaces before live updates. + ${snapshot-log} Set Variable /tmp/clab-events-snapshot.log + ${snapshot-err} Set Variable /tmp/clab-events-snapshot.err + Remove File If Exists ${snapshot-log} + Remove File If Exists ${snapshot-err} + TRY + Deploy Lab For Events + Sleep 3s + Start Events Process events_snapshot plain ${snapshot-log} ${snapshot-err} True + Sleep 3s + Stop Events Process events_snapshot + ${snapshot-output} = Get File ${snapshot-log} + Log ${snapshot-output} + Should Contain ${snapshot-output} container running + Should Contain ${snapshot-output} origin=snapshot + Should Contain ${snapshot-output} interface snapshot + FINALLY + Cleanup Events Scenario events_snapshot + Remove File If Exists ${snapshot-log} + Remove File If Exists ${snapshot-err} + END + Events Command Streams JSON Output [Documentation] Verify that JSON formatted events remain valid JSON lines and retain interface metadata. ${json-log} Set Variable /tmp/clab-events-json.log @@ -81,8 +104,9 @@ Remove File If Exists Run Keyword And Ignore Error Remove File ${path} Start Events Process - [Arguments] ${alias} ${format} ${stdout} ${stderr} + [Arguments] ${alias} ${format} ${stdout} ${stderr} ${initial}=False ${cmd} Set Variable ${CLAB_BIN} --runtime ${runtime} events --format ${format} + ${cmd} Run Keyword If '${initial}'=='True' Catenate ${cmd} --initial-state ELSE Set Variable ${cmd} Start Process ${cmd} shell=True alias=${alias} stdout=${stdout} stderr=${stderr} Sleep 1s From c1802f18e2851464f71ff1221167c49c29cd24e6 Mon Sep 17 00:00:00 2001 From: Flosch62 Date: Sun, 19 Oct 2025 18:14:54 +0200 Subject: [PATCH 09/10] Added management IP attributes --- core/events/stream.go | 119 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 117 insertions(+), 2 deletions(-) diff --git a/core/events/stream.go b/core/events/stream.go index ef8abe315..a9177b8f1 100644 --- a/core/events/stream.go +++ b/core/events/stream.go @@ -11,6 +11,7 @@ import ( clabconstants "github.com/srl-labs/containerlab/constants" clabcore "github.com/srl-labs/containerlab/core" clabruntime "github.com/srl-labs/containerlab/runtime" + clabtypes "github.com/srl-labs/containerlab/types" ) // Stream subscribes to the selected runtime and netlink sources and forwards @@ -132,7 +133,7 @@ func forwardRuntimeEvents( registry.HandleContainerEvent(runtime, ev) - aggregated := aggregatedEventFromContainerEvent(ev) + aggregated := aggregatedEventFromContainerEvent(ctx, runtime, ev) select { case eventSink <- aggregated: case <-ctx.Done(): @@ -142,7 +143,11 @@ func forwardRuntimeEvents( } } -func aggregatedEventFromContainerEvent(ev clabruntime.ContainerEvent) aggregatedEvent { +func aggregatedEventFromContainerEvent( + ctx context.Context, + runtime clabruntime.ContainerRuntime, + ev clabruntime.ContainerEvent, +) aggregatedEvent { ts := ev.Timestamp if ts.IsZero() { ts = time.Now() @@ -160,6 +165,8 @@ func aggregatedEventFromContainerEvent(ev clabruntime.ContainerEvent) aggregated actorName = attributes["name"] } + attributes = ensureMgmtIPAttributes(ctx, runtime, attributes, actorFullID, actorName) + short := ev.ActorID if short == "" { short = actorFullID @@ -186,6 +193,114 @@ func aggregatedEventFromContainerEvent(ev clabruntime.ContainerEvent) aggregated } } +func ensureMgmtIPAttributes( + ctx context.Context, + runtime clabruntime.ContainerRuntime, + attributes map[string]string, + actorFullID, actorName string, +) map[string]string { + if runtime == nil { + return attributes + } + + hasIPv4 := attributes != nil && attributes["mgmt_ipv4"] != "" + hasIPv6 := attributes != nil && attributes["mgmt_ipv6"] != "" + + if hasIPv4 && hasIPv6 { + return attributes + } + + filters := make([]*clabtypes.GenericFilter, 0, 2) + + if actorName == "" && attributes != nil { + actorName = attributes["name"] + } + + if actorName != "" { + filters = append(filters, &clabtypes.GenericFilter{FilterType: "name", Match: actorName}) + } + + if actorFullID != "" { + filters = append(filters, &clabtypes.GenericFilter{FilterType: "id", Match: actorFullID}) + } + + if len(filters) == 0 { + return attributes + } + + containers, err := runtime.ListContainers(ctx, filters) + if err != nil { + log.Debugf("failed to resolve container for event: %v", err) + + return attributes + } + + container := selectContainerForEvent(containers, actorFullID, actorName) + if container == nil { + return attributes + } + + if attributes == nil { + attributes = make(map[string]string) + } + + if !hasIPv4 { + if ipv4 := container.GetContainerIPv4(); ipv4 != "" && ipv4 != clabconstants.NotApplicable { + attributes["mgmt_ipv4"] = ipv4 + } + } + + if !hasIPv6 { + if ipv6 := container.GetContainerIPv6(); ipv6 != "" && ipv6 != clabconstants.NotApplicable { + attributes["mgmt_ipv6"] = ipv6 + } + } + + if len(attributes) == 0 { + return nil + } + + return attributes +} + +func selectContainerForEvent( + containers []clabruntime.GenericContainer, + actorFullID, actorName string, +) *clabruntime.GenericContainer { + if len(containers) == 0 { + return nil + } + + if actorFullID != "" { + for idx := range containers { + container := &containers[idx] + + switch { + case container.ID == actorFullID: + return container + case strings.HasPrefix(container.ID, actorFullID): + return container + case container.ShortID == actorFullID: + return container + } + } + } + + if actorName != "" { + for idx := range containers { + container := &containers[idx] + + for _, name := range container.Names { + if name == actorName { + return container + } + } + } + } + + return &containers[0] +} + func emitContainerSnapshots( ctx context.Context, containers []clabruntime.GenericContainer, From f22bda1a7002dc2bb2aabe3d63b787fa3dd05221 Mon Sep 17 00:00:00 2001 From: flosch62 Date: Thu, 23 Oct 2025 09:38:41 +0200 Subject: [PATCH 10/10] also sending interface stats each 1s --- core/events/netlink.go | 259 ++++++++++++++++++++++++++++++++++++++--- 1 file changed, 245 insertions(+), 14 deletions(-) diff --git a/core/events/netlink.go b/core/events/netlink.go index c9029cd91..258b9e851 100644 --- a/core/events/netlink.go +++ b/core/events/netlink.go @@ -241,12 +241,28 @@ func (w *netlinkWatcher) run(ctx context.Context, registry *netlinkRegistry) { } defer nsHandle.Close() - states, err := snapshotInterfaces(nsHandle) + netHandle, err := netlink.NewHandleAt(nsHandle) + if err != nil { + log.Debugf("failed to create netlink handle for container %s: %v", containerName, err) + + return + } + defer netHandle.Close() + + states, err := snapshotInterfaces(netHandle) if err != nil { log.Debugf("failed to snapshot interfaces for container %s: %v", containerName, err) states = make(map[int]ifaceSnapshot) } + statsSamples := make(map[int]ifaceStatsSample, len(states)) + now := time.Now() + for idx, snapshot := range states { + if sample, ok := newStatsSample(snapshot, now); ok { + statsSamples[idx] = sample + } + } + if w.includeSnapshot { for _, snapshot := range states { registry.emitInterfaceEvent(w.container, "snapshot", snapshot) @@ -263,8 +279,13 @@ func (w *netlinkWatcher) run(ctx context.Context, registry *netlinkRegistry) { return } + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + for { select { + case <-ticker.C: + w.collectAndEmitStats(netHandle, states, statsSamples, registry) case <-ctx.Done(): close(done) @@ -274,13 +295,14 @@ func (w *netlinkWatcher) run(ctx context.Context, registry *netlinkRegistry) { return } - w.processUpdate(states, update, registry) + w.processUpdate(states, statsSamples, update, registry) } } } func (w *netlinkWatcher) processUpdate( states map[int]ifaceSnapshot, + statsSamples map[int]ifaceStatsSample, update netlink.LinkUpdate, registry *netlinkRegistry, ) { @@ -303,6 +325,7 @@ func (w *netlinkWatcher) processUpdate( } delete(states, snapshot.Index) + delete(statsSamples, snapshot.Index) registry.emitInterfaceEvent(w.container, "delete", snapshot) case unix.RTM_NEWLINK: if exists && snapshot.equal(previous) { @@ -315,6 +338,9 @@ func (w *netlinkWatcher) processUpdate( } states[snapshot.Index] = snapshot + if sample, ok := newStatsSample(snapshot, time.Now()); ok { + statsSamples[snapshot.Index] = sample + } registry.emitInterfaceEvent(w.container, action, snapshot) } } @@ -343,15 +369,10 @@ func shortID(id string) string { return id } -func (r *netlinkRegistry) emitInterfaceEvent( +func interfaceAttributes( container *clabruntime.GenericContainer, - action string, snapshot ifaceSnapshot, -) { - if container == nil { - return - } - +) map[string]string { attributes := map[string]string{ "ifname": snapshot.Name, "index": strconv.Itoa(snapshot.Index), @@ -377,6 +398,20 @@ func (r *netlinkRegistry) emitInterfaceEvent( attributes["name"] = name } + return attributes +} + +func (r *netlinkRegistry) emitInterfaceEvent( + container *clabruntime.GenericContainer, + action string, + snapshot ifaceSnapshot, +) { + if container == nil { + return + } + + attributes := interfaceAttributes(container, snapshot) + event := aggregatedEvent{ Timestamp: time.Now(), Type: "interface", @@ -429,12 +464,46 @@ func waitForNamespacePath( return "", fmt.Errorf("namespace path not ready for container %s", containerID) } -func snapshotInterfaces(nsHandle netns.NsHandle) (map[int]ifaceSnapshot, error) { - netHandle, err := netlink.NewHandleAt(nsHandle) - if err != nil { - return nil, fmt.Errorf("unable to enter namespace: %w", err) +func (r *netlinkRegistry) emitInterfaceStatsEvent( + container *clabruntime.GenericContainer, + snapshot ifaceSnapshot, + metrics ifaceStatsMetrics, +) { + if container == nil || !snapshot.HasStats { + return + } + + attributes := interfaceAttributes(container, snapshot) + attributes["rx_bytes"] = strconv.FormatUint(metrics.RxBytes, 10) + attributes["tx_bytes"] = strconv.FormatUint(metrics.TxBytes, 10) + attributes["rx_packets"] = strconv.FormatUint(metrics.RxPackets, 10) + attributes["tx_packets"] = strconv.FormatUint(metrics.TxPackets, 10) + attributes["rx_bps"] = strconv.FormatFloat(metrics.RxBps, 'f', -1, 64) + attributes["tx_bps"] = strconv.FormatFloat(metrics.TxBps, 'f', -1, 64) + attributes["rx_pps"] = strconv.FormatFloat(metrics.RxPps, 'f', -1, 64) + attributes["tx_pps"] = strconv.FormatFloat(metrics.TxPps, 'f', -1, 64) + attributes["interval_seconds"] = strconv.FormatFloat(metrics.Interval.Seconds(), 'f', -1, 64) + + event := aggregatedEvent{ + Timestamp: metrics.Timestamp, + Type: "interface", + Action: "stats", + ActorID: container.ShortID, + ActorName: firstContainerName(container), + ActorFullID: container.ID, + Attributes: attributes, + } + + select { + case r.events <- event: + case <-r.ctx.Done(): + } +} + +func snapshotInterfaces(netHandle *netlink.Handle) (map[int]ifaceSnapshot, error) { + if netHandle == nil { + return nil, fmt.Errorf("netlink handle is nil") } - defer netHandle.Close() links, err := netHandle.LinkList() if err != nil { @@ -466,6 +535,13 @@ func snapshotFromLink(link netlink.Link) ifaceSnapshot { snapshot.MAC = attrs.HardwareAddr.String() } snapshot.OperState = attrs.OperState.String() + if stats := attrs.Statistics; stats != nil { + snapshot.HasStats = true + snapshot.RxBytes = stats.RxBytes + snapshot.TxBytes = stats.TxBytes + snapshot.RxPackets = stats.RxPackets + snapshot.TxPackets = stats.TxPackets + } } return snapshot @@ -479,6 +555,11 @@ type ifaceSnapshot struct { MAC string OperState string Type string + HasStats bool + RxBytes uint64 + TxBytes uint64 + RxPackets uint64 + TxPackets uint64 } func (s ifaceSnapshot) equal(other ifaceSnapshot) bool { @@ -490,3 +571,153 @@ func (s ifaceSnapshot) equal(other ifaceSnapshot) bool { s.OperState == other.OperState && s.Type == other.Type } + +type ifaceStatsSample struct { + RxBytes uint64 + TxBytes uint64 + RxPackets uint64 + TxPackets uint64 + Timestamp time.Time +} + +type ifaceStatsMetrics struct { + RxBytes uint64 + TxBytes uint64 + RxPackets uint64 + TxPackets uint64 + RxBps float64 + TxBps float64 + RxPps float64 + TxPps float64 + Interval time.Duration + Timestamp time.Time +} + +func newStatsSample(snapshot ifaceSnapshot, timestamp time.Time) (ifaceStatsSample, bool) { + if !snapshot.HasStats { + return ifaceStatsSample{}, false + } + + return ifaceStatsSample{ + RxBytes: snapshot.RxBytes, + TxBytes: snapshot.TxBytes, + RxPackets: snapshot.RxPackets, + TxPackets: snapshot.TxPackets, + Timestamp: timestamp, + }, true +} + +func (w *netlinkWatcher) collectAndEmitStats( + netHandle *netlink.Handle, + states map[int]ifaceSnapshot, + statsSamples map[int]ifaceStatsSample, + registry *netlinkRegistry, +) { + if netHandle == nil { + return + } + + now := time.Now() + + for idx, state := range states { + link, err := netHandle.LinkByIndex(idx) + if err != nil { + continue + } + + current := snapshotFromLink(link) + state.Name = current.Name + state.Alias = current.Alias + state.MTU = current.MTU + state.MAC = current.MAC + state.OperState = current.OperState + state.Type = current.Type + state.HasStats = current.HasStats + state.RxBytes = current.RxBytes + state.TxBytes = current.TxBytes + state.RxPackets = current.RxPackets + state.TxPackets = current.TxPackets + states[idx] = state + + if !state.HasStats { + delete(statsSamples, idx) + + continue + } + + prev, ok := statsSamples[idx] + if !ok || now.Sub(prev.Timestamp) <= 0 { + statsSamples[idx] = ifaceStatsSample{ + RxBytes: state.RxBytes, + TxBytes: state.TxBytes, + RxPackets: state.RxPackets, + TxPackets: state.TxPackets, + Timestamp: now, + } + + continue + } + + interval := now.Sub(prev.Timestamp) + if interval <= 0 { + statsSamples[idx] = ifaceStatsSample{ + RxBytes: state.RxBytes, + TxBytes: state.TxBytes, + RxPackets: state.RxPackets, + TxPackets: state.TxPackets, + Timestamp: now, + } + + continue + } + + rxBytesDelta := deltaCounter(prev.RxBytes, state.RxBytes) + txBytesDelta := deltaCounter(prev.TxBytes, state.TxBytes) + rxPacketsDelta := deltaCounter(prev.RxPackets, state.RxPackets) + txPacketsDelta := deltaCounter(prev.TxPackets, state.TxPackets) + + seconds := interval.Seconds() + if seconds <= 0 { + statsSamples[idx] = ifaceStatsSample{ + RxBytes: state.RxBytes, + TxBytes: state.TxBytes, + RxPackets: state.RxPackets, + TxPackets: state.TxPackets, + Timestamp: now, + } + + continue + } + + metrics := ifaceStatsMetrics{ + RxBytes: state.RxBytes, + TxBytes: state.TxBytes, + RxPackets: state.RxPackets, + TxPackets: state.TxPackets, + RxBps: float64(rxBytesDelta) * 8 / seconds, + TxBps: float64(txBytesDelta) * 8 / seconds, + RxPps: float64(rxPacketsDelta) / seconds, + TxPps: float64(txPacketsDelta) / seconds, + Interval: interval, + Timestamp: now, + } + + registry.emitInterfaceStatsEvent(w.container, state, metrics) + + statsSamples[idx] = ifaceStatsSample{ + RxBytes: state.RxBytes, + TxBytes: state.TxBytes, + RxPackets: state.RxPackets, + TxPackets: state.TxPackets, + Timestamp: now, + } + } +} + +func deltaCounter(previous, current uint64) uint64 { + if current >= previous { + return current - previous + } + + return current +}