diff --git a/cmd/events.go b/cmd/events.go new file mode 100644 index 000000000..0330d71da --- /dev/null +++ b/cmd/events.go @@ -0,0 +1,59 @@ +package cmd + +import ( + "github.com/spf13/cobra" + clabevents "github.com/srl-labs/containerlab/core/events" + clabutils "github.com/srl-labs/containerlab/utils" +) + +func eventsCmd(o *Options) (*cobra.Command, error) { + c := &cobra.Command{ + Use: "events", + Short: "stream lab lifecycle and interface events", + Long: "stream container runtime events and interface updates for all running labs using the selected runtime\n" + + "reference: https://containerlab.dev/cmd/events/", + Aliases: []string{"ev"}, + PreRunE: func(*cobra.Command, []string) error { + return clabutils.CheckAndGetRootPrivs() + }, + RunE: func(cmd *cobra.Command, _ []string) error { + return eventsFn(cmd, o) + }, + } + + c.Flags().StringVarP( + &o.Events.Format, + "format", + "f", + o.Events.Format, + "output format. One of [plain, json]", + ) + + c.Flags().BoolVarP( + &o.Events.IncludeInitialState, + "initial-state", + "i", + o.Events.IncludeInitialState, + "emit the current container and interface states before streaming new events", + ) + + c.Example = `# Stream container and interface events in plain text +containerlab events + +# Stream events as JSON +containerlab events --format json` + + return c, nil +} + +func eventsFn(cmd *cobra.Command, o *Options) error { + opts := clabevents.Options{ + Format: o.Events.Format, + Runtime: o.Global.Runtime, + IncludeInitialState: o.Events.IncludeInitialState, + ClabOptions: o.ToClabOptions(), + Writer: cmd.OutOrStdout(), + } + + return clabevents.Stream(cmd.Context(), opts) +} diff --git a/cmd/options.go b/cmd/options.go index d77814afd..6f7dba54c 100644 --- a/cmd/options.go +++ b/cmd/options.go @@ -52,6 +52,9 @@ func GetOptions() *Options { MermaidDirection: "TD", DrawIOVersion: "latest", }, + Events: &EventsOptions{ + Format: "plain", + }, ToolsAPI: &ToolsApiOptions{ Image: "ghcr.io/srl-labs/clab-api-server/clab-api-server:latest", Name: "clab-api-server", @@ -121,6 +124,7 @@ type Options struct { Exec *ExecOptions Inspect *InspectOptions Graph *GraphOptions + Events *EventsOptions ToolsAPI *ToolsApiOptions ToolsCert *ToolsCertOptions ToolsTxOffload *ToolsDisableTxOffloadOptions @@ -353,6 +357,11 @@ type GraphOptions struct { StaticDirectory string } +type EventsOptions struct { + Format string + IncludeInitialState bool +} + type ToolsApiOptions struct { Image string Name string diff --git a/cmd/root.go b/cmd/root.go index 79bd46642..7e0bd70a5 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -29,6 +29,7 @@ func subcommandRegisterFuncs() []func(*Options) (*cobra.Command, error) { execCmd, generateCmd, graphCmd, + eventsCmd, inspectCmd, redeployCmd, saveCmd, diff --git a/core/events/formatter.go b/core/events/formatter.go new file mode 100644 index 000000000..9620f16b9 --- /dev/null +++ b/core/events/formatter.go @@ -0,0 +1,108 @@ +package events + +import ( + "encoding/json" + "fmt" + "io" + "sort" + "strings" + "time" +) + +type formatter func(aggregatedEvent) error + +func newFormatter(format string, w io.Writer) (formatter, error) { + normalized := strings.TrimSpace(strings.ToLower(format)) + if normalized == "" { + normalized = "plain" + } + + switch normalized { + case "plain": + return plainFormatter(w), nil + case "json": + return jsonFormatter(w), nil + default: + return nil, fmt.Errorf("output format %q is not supported, use 'plain' or 'json'", format) + } +} + +func plainFormatter(w io.Writer) formatter { + return func(ev aggregatedEvent) error { + ts := ev.Timestamp + if ts.IsZero() { + ts = time.Now() + } + ts = ts.UTC() + + actor := ev.ActorID + if actor == "" { + actor = ev.ActorName + } + if actor == "" { + actor = "-" + } + + attrs := mergedEventAttributes(ev) + keys := make([]string, 0, len(attrs)) + for k := range attrs { + keys = append(keys, k) + } + sort.Strings(keys) + + parts := make([]string, 0, len(keys)) + for _, k := range keys { + parts = append(parts, fmt.Sprintf("%s=%s", k, attrs[k])) + } + + suffix := "" + if len(parts) > 0 { + suffix = " (" + strings.Join(parts, ", ") + ")" + } + + _, err := fmt.Fprintf(w, "%s %s %s %s%s\n", ts.Format(time.RFC3339Nano), ev.Type, ev.Action, actor, suffix) + + return err + } +} + +func jsonFormatter(w io.Writer) formatter { + encoder := json.NewEncoder(w) + encoder.SetEscapeHTML(false) + + return func(ev aggregatedEvent) error { + copy := ev + copy.Attributes = mergedEventAttributes(ev) + + return encoder.Encode(copy) + } +} + +func mergedEventAttributes(ev aggregatedEvent) map[string]string { + if len(ev.Attributes) == 0 && ev.ActorName == "" && ev.ActorFullID == "" { + return nil + } + + attrs := make(map[string]string, len(ev.Attributes)+2) + for k, v := range ev.Attributes { + if v == "" { + continue + } + + attrs[k] = v + } + + if ev.ActorName != "" { + attrs["name"] = ev.ActorName + } + + if ev.ActorFullID != "" { + attrs["id"] = ev.ActorFullID + } + + if len(attrs) == 0 { + return nil + } + + return attrs +} diff --git a/core/events/netlink.go b/core/events/netlink.go new file mode 100644 index 000000000..258b9e851 --- /dev/null +++ b/core/events/netlink.go @@ -0,0 +1,723 @@ +package events + +import ( + "context" + "fmt" + "strconv" + "strings" + "sync" + "time" + + "github.com/charmbracelet/log" + clabconstants "github.com/srl-labs/containerlab/constants" + clabruntime "github.com/srl-labs/containerlab/runtime" + "github.com/vishvananda/netlink" + "github.com/vishvananda/netns" + "golang.org/x/sys/unix" +) + +type netlinkRegistry struct { + ctx context.Context + mu sync.Mutex + watchers map[string]*netlinkWatcher + events chan<- aggregatedEvent + includeInitialSnapshot bool +} + +func newNetlinkRegistry(ctx context.Context, events chan<- aggregatedEvent, includeInitialSnapshot bool) *netlinkRegistry { + return &netlinkRegistry{ + ctx: ctx, + watchers: make(map[string]*netlinkWatcher), + events: events, + includeInitialSnapshot: includeInitialSnapshot, + } +} + +func (r *netlinkRegistry) Start(container *clabruntime.GenericContainer) { + clone := cloneContainer(container) + if clone == nil { + return + } + + id := clone.ID + if id == "" { + id = clone.ShortID + } + + if id == "" { + return + } + + r.mu.Lock() + if _, exists := r.watchers[id]; exists { + r.mu.Unlock() + + return + } + + watcherCtx, cancel := context.WithCancel(r.ctx) + watcher := &netlinkWatcher{ + container: clone, + cancel: cancel, + done: make(chan struct{}), + includeSnapshot: r.includeInitialSnapshot, + } + + r.watchers[id] = watcher + r.mu.Unlock() + + go watcher.run(watcherCtx, r) +} + +func (r *netlinkRegistry) Stop(id string) { + if id == "" { + return + } + + r.mu.Lock() + watcher, ok := r.watchers[id] + if ok { + delete(r.watchers, id) + } + r.mu.Unlock() + + if !ok { + return + } + + watcher.cancel() + <-watcher.done +} + +func (r *netlinkRegistry) remove(id string, watcher *netlinkWatcher) { + if id == "" { + return + } + + r.mu.Lock() + defer r.mu.Unlock() + + if current, ok := r.watchers[id]; ok && current == watcher { + delete(r.watchers, id) + } +} + +func (r *netlinkRegistry) HandleContainerEvent( + runtime clabruntime.ContainerRuntime, + ev clabruntime.ContainerEvent, +) { + if !strings.EqualFold(ev.Type, clabruntime.EventTypeContainer) { + return + } + + action := strings.ToLower(ev.Action) + + switch action { + case clabruntime.EventActionStart, clabruntime.EventActionUnpause, clabruntime.EventActionRestart: + container := containerFromEvent(runtime, ev) + if container != nil { + r.Start(container) + } + case clabruntime.EventActionDie, clabruntime.EventActionStop, clabruntime.EventActionDestroy, clabruntime.EventActionKill: + id := ev.ActorFullID + if id == "" { + id = ev.ActorID + } + + r.Stop(id) + } +} + +func cloneContainer(container *clabruntime.GenericContainer) *clabruntime.GenericContainer { + if container == nil { + return nil + } + + clone := &clabruntime.GenericContainer{ + Names: append([]string{}, container.Names...), + ID: container.ID, + ShortID: container.ShortID, + Labels: cloneStringMap(container.Labels), + } + + if clone.ShortID == "" { + clone.ShortID = shortID(clone.ID) + } + + if container.Runtime == nil { + return nil + } + + clone.SetRuntime(container.Runtime) + + return clone +} + +func containerFromEvent( + runtime clabruntime.ContainerRuntime, + ev clabruntime.ContainerEvent, +) *clabruntime.GenericContainer { + attributes := ev.Attributes + + name := ev.ActorName + if name == "" && attributes != nil { + name = attributes["name"] + } + + id := ev.ActorFullID + if id == "" { + id = ev.ActorID + } + + if id == "" && name == "" { + return nil + } + + short := ev.ActorID + if short == "" { + short = id + } + + container := &clabruntime.GenericContainer{ + ID: id, + ShortID: shortID(short), + } + + if name != "" { + container.Names = []string{name} + } + + if attributes != nil { + if lab := attributes[clabconstants.Containerlab]; lab != "" { + container.Labels = map[string]string{clabconstants.Containerlab: lab} + } + } + + if container.ShortID == "" { + container.ShortID = shortID(container.ID) + } + + if runtime != nil { + container.SetRuntime(runtime) + } + + return container +} + +type netlinkWatcher struct { + container *clabruntime.GenericContainer + cancel context.CancelFunc + done chan struct{} + includeSnapshot bool +} + +func (w *netlinkWatcher) run(ctx context.Context, registry *netlinkRegistry) { + defer close(w.done) + if w.container == nil { + return + } + + defer registry.remove(w.container.ID, w) + + containerName := firstContainerName(w.container) + if w.container.Runtime == nil { + log.Debugf("container %s has no runtime, skipping netlink watcher", containerName) + + return + } + + nsPath, err := waitForNamespacePath(ctx, w.container.Runtime, w.container.ID) + if err != nil || nsPath == "" { + log.Debugf("failed to resolve netns for container %s: %v", containerName, err) + + return + } + + nsHandle, err := netns.GetFromPath(nsPath) + if err != nil { + log.Debugf("failed to open netns for container %s: %v", containerName, err) + + return + } + defer nsHandle.Close() + + netHandle, err := netlink.NewHandleAt(nsHandle) + if err != nil { + log.Debugf("failed to create netlink handle for container %s: %v", containerName, err) + + return + } + defer netHandle.Close() + + states, err := snapshotInterfaces(netHandle) + if err != nil { + log.Debugf("failed to snapshot interfaces for container %s: %v", containerName, err) + states = make(map[int]ifaceSnapshot) + } + + statsSamples := make(map[int]ifaceStatsSample, len(states)) + now := time.Now() + for idx, snapshot := range states { + if sample, ok := newStatsSample(snapshot, now); ok { + statsSamples[idx] = sample + } + } + + if w.includeSnapshot { + for _, snapshot := range states { + registry.emitInterfaceEvent(w.container, "snapshot", snapshot) + } + } + + updates := make(chan netlink.LinkUpdate, 32) + done := make(chan struct{}) + opts := netlink.LinkSubscribeOptions{Namespace: &nsHandle} + + if err := netlink.LinkSubscribeWithOptions(updates, done, opts); err != nil { + log.Debugf("failed to subscribe to netlink updates for container %s: %v", containerName, err) + + return + } + + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + w.collectAndEmitStats(netHandle, states, statsSamples, registry) + case <-ctx.Done(): + close(done) + + return + case update, ok := <-updates: + if !ok { + return + } + + w.processUpdate(states, statsSamples, update, registry) + } + } +} + +func (w *netlinkWatcher) processUpdate( + states map[int]ifaceSnapshot, + statsSamples map[int]ifaceStatsSample, + update netlink.LinkUpdate, + registry *netlinkRegistry, +) { + if update.Link == nil { + return + } + + attrs := update.Link.Attrs() + if attrs == nil { + return + } + + snapshot := snapshotFromLink(update.Link) + previous, exists := states[snapshot.Index] + + switch update.Header.Type { + case unix.RTM_DELLINK: + if exists { + snapshot = previous + } + + delete(states, snapshot.Index) + delete(statsSamples, snapshot.Index) + registry.emitInterfaceEvent(w.container, "delete", snapshot) + case unix.RTM_NEWLINK: + if exists && snapshot.equal(previous) { + return + } + + action := "create" + if exists { + action = "update" + } + + states[snapshot.Index] = snapshot + if sample, ok := newStatsSample(snapshot, time.Now()); ok { + statsSamples[snapshot.Index] = sample + } + registry.emitInterfaceEvent(w.container, action, snapshot) + } +} + +func firstContainerName(container *clabruntime.GenericContainer) string { + if container == nil || len(container.Names) == 0 { + return "" + } + + return container.Names[0] +} + +func containerLabel(container *clabruntime.GenericContainer) string { + if container == nil || container.Labels == nil { + return "" + } + + return container.Labels[clabconstants.Containerlab] +} + +func shortID(id string) string { + if len(id) > 12 { + return id[:12] + } + + return id +} + +func interfaceAttributes( + container *clabruntime.GenericContainer, + snapshot ifaceSnapshot, +) map[string]string { + attributes := map[string]string{ + "ifname": snapshot.Name, + "index": strconv.Itoa(snapshot.Index), + "mtu": strconv.Itoa(snapshot.MTU), + "state": snapshot.OperState, + "type": snapshot.Type, + "origin": "netlink", + } + + if snapshot.Alias != "" { + attributes["alias"] = snapshot.Alias + } + + if snapshot.MAC != "" { + attributes["mac"] = snapshot.MAC + } + + if lab := containerLabel(container); lab != "" { + attributes["lab"] = lab + } + + if name := firstContainerName(container); name != "" { + attributes["name"] = name + } + + return attributes +} + +func (r *netlinkRegistry) emitInterfaceEvent( + container *clabruntime.GenericContainer, + action string, + snapshot ifaceSnapshot, +) { + if container == nil { + return + } + + attributes := interfaceAttributes(container, snapshot) + + event := aggregatedEvent{ + Timestamp: time.Now(), + Type: "interface", + Action: action, + ActorID: container.ShortID, + ActorName: firstContainerName(container), + ActorFullID: container.ID, + Attributes: attributes, + } + + select { + case r.events <- event: + case <-r.ctx.Done(): + } +} + +func waitForNamespacePath( + ctx context.Context, + runtime clabruntime.ContainerRuntime, + containerID string, +) (string, error) { + const ( + attempts = 5 + retryDelay = 200 * time.Millisecond + ) + + var lastErr error + + for i := 0; i < attempts; i++ { + nsPath, err := runtime.GetNSPath(ctx, containerID) + if err == nil && nsPath != "" { + return nsPath, nil + } + + if err != nil { + lastErr = err + } + + select { + case <-time.After(retryDelay): + case <-ctx.Done(): + return "", ctx.Err() + } + } + + if lastErr != nil { + return "", lastErr + } + + return "", fmt.Errorf("namespace path not ready for container %s", containerID) +} + +func (r *netlinkRegistry) emitInterfaceStatsEvent( + container *clabruntime.GenericContainer, + snapshot ifaceSnapshot, + metrics ifaceStatsMetrics, +) { + if container == nil || !snapshot.HasStats { + return + } + + attributes := interfaceAttributes(container, snapshot) + attributes["rx_bytes"] = strconv.FormatUint(metrics.RxBytes, 10) + attributes["tx_bytes"] = strconv.FormatUint(metrics.TxBytes, 10) + attributes["rx_packets"] = strconv.FormatUint(metrics.RxPackets, 10) + attributes["tx_packets"] = strconv.FormatUint(metrics.TxPackets, 10) + attributes["rx_bps"] = strconv.FormatFloat(metrics.RxBps, 'f', -1, 64) + attributes["tx_bps"] = strconv.FormatFloat(metrics.TxBps, 'f', -1, 64) + attributes["rx_pps"] = strconv.FormatFloat(metrics.RxPps, 'f', -1, 64) + attributes["tx_pps"] = strconv.FormatFloat(metrics.TxPps, 'f', -1, 64) + attributes["interval_seconds"] = strconv.FormatFloat(metrics.Interval.Seconds(), 'f', -1, 64) + + event := aggregatedEvent{ + Timestamp: metrics.Timestamp, + Type: "interface", + Action: "stats", + ActorID: container.ShortID, + ActorName: firstContainerName(container), + ActorFullID: container.ID, + Attributes: attributes, + } + + select { + case r.events <- event: + case <-r.ctx.Done(): + } +} + +func snapshotInterfaces(netHandle *netlink.Handle) (map[int]ifaceSnapshot, error) { + if netHandle == nil { + return nil, fmt.Errorf("netlink handle is nil") + } + + links, err := netHandle.LinkList() + if err != nil { + return nil, fmt.Errorf("unable to list links: %w", err) + } + + states := make(map[int]ifaceSnapshot, len(links)) + for _, link := range links { + snapshot := snapshotFromLink(link) + states[snapshot.Index] = snapshot + } + + return states, nil +} + +func snapshotFromLink(link netlink.Link) ifaceSnapshot { + attrs := link.Attrs() + + snapshot := ifaceSnapshot{ + Type: link.Type(), + } + + if attrs != nil { + snapshot.Index = attrs.Index + snapshot.Name = attrs.Name + snapshot.Alias = attrs.Alias + snapshot.MTU = attrs.MTU + if len(attrs.HardwareAddr) > 0 { + snapshot.MAC = attrs.HardwareAddr.String() + } + snapshot.OperState = attrs.OperState.String() + if stats := attrs.Statistics; stats != nil { + snapshot.HasStats = true + snapshot.RxBytes = stats.RxBytes + snapshot.TxBytes = stats.TxBytes + snapshot.RxPackets = stats.RxPackets + snapshot.TxPackets = stats.TxPackets + } + } + + return snapshot +} + +type ifaceSnapshot struct { + Index int + Name string + Alias string + MTU int + MAC string + OperState string + Type string + HasStats bool + RxBytes uint64 + TxBytes uint64 + RxPackets uint64 + TxPackets uint64 +} + +func (s ifaceSnapshot) equal(other ifaceSnapshot) bool { + return s.Index == other.Index && + s.Name == other.Name && + s.Alias == other.Alias && + s.MTU == other.MTU && + s.MAC == other.MAC && + s.OperState == other.OperState && + s.Type == other.Type +} + +type ifaceStatsSample struct { + RxBytes uint64 + TxBytes uint64 + RxPackets uint64 + TxPackets uint64 + Timestamp time.Time +} + +type ifaceStatsMetrics struct { + RxBytes uint64 + TxBytes uint64 + RxPackets uint64 + TxPackets uint64 + RxBps float64 + TxBps float64 + RxPps float64 + TxPps float64 + Interval time.Duration + Timestamp time.Time +} + +func newStatsSample(snapshot ifaceSnapshot, timestamp time.Time) (ifaceStatsSample, bool) { + if !snapshot.HasStats { + return ifaceStatsSample{}, false + } + + return ifaceStatsSample{ + RxBytes: snapshot.RxBytes, + TxBytes: snapshot.TxBytes, + RxPackets: snapshot.RxPackets, + TxPackets: snapshot.TxPackets, + Timestamp: timestamp, + }, true +} + +func (w *netlinkWatcher) collectAndEmitStats( + netHandle *netlink.Handle, + states map[int]ifaceSnapshot, + statsSamples map[int]ifaceStatsSample, + registry *netlinkRegistry, +) { + if netHandle == nil { + return + } + + now := time.Now() + + for idx, state := range states { + link, err := netHandle.LinkByIndex(idx) + if err != nil { + continue + } + + current := snapshotFromLink(link) + state.Name = current.Name + state.Alias = current.Alias + state.MTU = current.MTU + state.MAC = current.MAC + state.OperState = current.OperState + state.Type = current.Type + state.HasStats = current.HasStats + state.RxBytes = current.RxBytes + state.TxBytes = current.TxBytes + state.RxPackets = current.RxPackets + state.TxPackets = current.TxPackets + states[idx] = state + + if !state.HasStats { + delete(statsSamples, idx) + + continue + } + + prev, ok := statsSamples[idx] + if !ok || now.Sub(prev.Timestamp) <= 0 { + statsSamples[idx] = ifaceStatsSample{ + RxBytes: state.RxBytes, + TxBytes: state.TxBytes, + RxPackets: state.RxPackets, + TxPackets: state.TxPackets, + Timestamp: now, + } + + continue + } + + interval := now.Sub(prev.Timestamp) + if interval <= 0 { + statsSamples[idx] = ifaceStatsSample{ + RxBytes: state.RxBytes, + TxBytes: state.TxBytes, + RxPackets: state.RxPackets, + TxPackets: state.TxPackets, + Timestamp: now, + } + + continue + } + + rxBytesDelta := deltaCounter(prev.RxBytes, state.RxBytes) + txBytesDelta := deltaCounter(prev.TxBytes, state.TxBytes) + rxPacketsDelta := deltaCounter(prev.RxPackets, state.RxPackets) + txPacketsDelta := deltaCounter(prev.TxPackets, state.TxPackets) + + seconds := interval.Seconds() + if seconds <= 0 { + statsSamples[idx] = ifaceStatsSample{ + RxBytes: state.RxBytes, + TxBytes: state.TxBytes, + RxPackets: state.RxPackets, + TxPackets: state.TxPackets, + Timestamp: now, + } + + continue + } + + metrics := ifaceStatsMetrics{ + RxBytes: state.RxBytes, + TxBytes: state.TxBytes, + RxPackets: state.RxPackets, + TxPackets: state.TxPackets, + RxBps: float64(rxBytesDelta) * 8 / seconds, + TxBps: float64(txBytesDelta) * 8 / seconds, + RxPps: float64(rxPacketsDelta) / seconds, + TxPps: float64(txPacketsDelta) / seconds, + Interval: interval, + Timestamp: now, + } + + registry.emitInterfaceStatsEvent(w.container, state, metrics) + + statsSamples[idx] = ifaceStatsSample{ + RxBytes: state.RxBytes, + TxBytes: state.TxBytes, + RxPackets: state.RxPackets, + TxPackets: state.TxPackets, + Timestamp: now, + } + } +} + +func deltaCounter(previous, current uint64) uint64 { + if current >= previous { + return current - previous + } + + return current +} diff --git a/core/events/options.go b/core/events/options.go new file mode 100644 index 000000000..d36f06488 --- /dev/null +++ b/core/events/options.go @@ -0,0 +1,24 @@ +package events + +import ( + "io" + + clabcore "github.com/srl-labs/containerlab/core" +) + +// Options configure how runtime and interface events are sourced and rendered. +type Options struct { + Format string + Runtime string + IncludeInitialState bool + ClabOptions []clabcore.ClabOption + Writer io.Writer +} + +func (o Options) writer() io.Writer { + if o.Writer != nil { + return o.Writer + } + + return io.Discard +} diff --git a/core/events/stream.go b/core/events/stream.go new file mode 100644 index 000000000..a9177b8f1 --- /dev/null +++ b/core/events/stream.go @@ -0,0 +1,416 @@ +package events + +import ( + "context" + "errors" + "fmt" + "strings" + "time" + + "github.com/charmbracelet/log" + clabconstants "github.com/srl-labs/containerlab/constants" + clabcore "github.com/srl-labs/containerlab/core" + clabruntime "github.com/srl-labs/containerlab/runtime" + clabtypes "github.com/srl-labs/containerlab/types" +) + +// Stream subscribes to the selected runtime and netlink sources and forwards +// aggregated events to the configured writer. +func Stream(ctx context.Context, opts Options) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + clab, err := clabcore.NewContainerLab(opts.ClabOptions...) + if err != nil { + return err + } + + runtime, ok := clab.Runtimes[opts.Runtime] + if !ok { + return fmt.Errorf("runtime %q is not initialized", opts.Runtime) + } + + printer, err := newFormatter(opts.Format, opts.writer()) + if err != nil { + return err + } + + eventCh := make(chan aggregatedEvent, 128) + registry := newNetlinkRegistry(ctx, eventCh, opts.IncludeInitialState) + + containers, err := clab.ListContainers(ctx, clabcore.WithListclabLabelExists()) + if err != nil { + return fmt.Errorf("failed to list containers: %w", err) + } + + if opts.IncludeInitialState { + go emitContainerSnapshots(ctx, containers, eventCh) + } + + for idx := range containers { + container := containers[idx] + if !isRunningContainer(&container) { + continue + } + + registry.Start(&container) + } + + streamOpts := clabruntime.EventStreamOptions{ + Labels: map[string]string{ + clabconstants.Containerlab: "", + }, + } + + runtimeEvents, runtimeErrs, err := runtime.StreamEvents(ctx, streamOpts) + if err != nil { + return fmt.Errorf("failed to stream events for runtime %q: %w", opts.Runtime, err) + } + + errCh := make(chan error, 1) + go forwardRuntimeEvents(ctx, runtime, registry, runtimeEvents, runtimeErrs, eventCh, errCh) + + runtimeErrors := errCh + + for { + select { + case ev := <-eventCh: + if err := printer(ev); err != nil { + log.Debugf("failed to write event: %v", err) + } + case err, ok := <-runtimeErrors: + if !ok { + runtimeErrors = nil + + continue + } + + if err != nil && !errors.Is(err, context.Canceled) { + return err + } + case <-ctx.Done(): + return nil + } + } +} + +func forwardRuntimeEvents( + ctx context.Context, + runtime clabruntime.ContainerRuntime, + registry *netlinkRegistry, + runtimeEvents <-chan clabruntime.ContainerEvent, + runtimeErrs <-chan error, + eventSink chan<- aggregatedEvent, + errSink chan<- error, +) { + defer close(errSink) + + sendErr := func(err error) { + select { + case errSink <- err: + case <-ctx.Done(): + } + } + + for { + select { + case <-ctx.Done(): + return + case err, ok := <-runtimeErrs: + if !ok { + return + } + + if err != nil && !errors.Is(err, context.Canceled) { + sendErr(err) + + return + } + case ev, ok := <-runtimeEvents: + if !ok { + return + } + + registry.HandleContainerEvent(runtime, ev) + + aggregated := aggregatedEventFromContainerEvent(ctx, runtime, ev) + select { + case eventSink <- aggregated: + case <-ctx.Done(): + return + } + } + } +} + +func aggregatedEventFromContainerEvent( + ctx context.Context, + runtime clabruntime.ContainerRuntime, + ev clabruntime.ContainerEvent, +) aggregatedEvent { + ts := ev.Timestamp + if ts.IsZero() { + ts = time.Now() + } + + attributes := cloneStringMap(ev.Attributes) + + actorFullID := ev.ActorFullID + if actorFullID == "" { + actorFullID = ev.ActorID + } + + actorName := ev.ActorName + if actorName == "" && attributes != nil { + actorName = attributes["name"] + } + + attributes = ensureMgmtIPAttributes(ctx, runtime, attributes, actorFullID, actorName) + + short := ev.ActorID + if short == "" { + short = actorFullID + } + + action := strings.ToLower(ev.Action) + if action == "" { + action = ev.Action + } + + eventType := strings.ToLower(ev.Type) + if eventType == "" { + eventType = ev.Type + } + + return aggregatedEvent{ + Timestamp: ts, + Type: eventType, + Action: action, + ActorID: shortID(short), + ActorName: actorName, + ActorFullID: actorFullID, + Attributes: attributes, + } +} + +func ensureMgmtIPAttributes( + ctx context.Context, + runtime clabruntime.ContainerRuntime, + attributes map[string]string, + actorFullID, actorName string, +) map[string]string { + if runtime == nil { + return attributes + } + + hasIPv4 := attributes != nil && attributes["mgmt_ipv4"] != "" + hasIPv6 := attributes != nil && attributes["mgmt_ipv6"] != "" + + if hasIPv4 && hasIPv6 { + return attributes + } + + filters := make([]*clabtypes.GenericFilter, 0, 2) + + if actorName == "" && attributes != nil { + actorName = attributes["name"] + } + + if actorName != "" { + filters = append(filters, &clabtypes.GenericFilter{FilterType: "name", Match: actorName}) + } + + if actorFullID != "" { + filters = append(filters, &clabtypes.GenericFilter{FilterType: "id", Match: actorFullID}) + } + + if len(filters) == 0 { + return attributes + } + + containers, err := runtime.ListContainers(ctx, filters) + if err != nil { + log.Debugf("failed to resolve container for event: %v", err) + + return attributes + } + + container := selectContainerForEvent(containers, actorFullID, actorName) + if container == nil { + return attributes + } + + if attributes == nil { + attributes = make(map[string]string) + } + + if !hasIPv4 { + if ipv4 := container.GetContainerIPv4(); ipv4 != "" && ipv4 != clabconstants.NotApplicable { + attributes["mgmt_ipv4"] = ipv4 + } + } + + if !hasIPv6 { + if ipv6 := container.GetContainerIPv6(); ipv6 != "" && ipv6 != clabconstants.NotApplicable { + attributes["mgmt_ipv6"] = ipv6 + } + } + + if len(attributes) == 0 { + return nil + } + + return attributes +} + +func selectContainerForEvent( + containers []clabruntime.GenericContainer, + actorFullID, actorName string, +) *clabruntime.GenericContainer { + if len(containers) == 0 { + return nil + } + + if actorFullID != "" { + for idx := range containers { + container := &containers[idx] + + switch { + case container.ID == actorFullID: + return container + case strings.HasPrefix(container.ID, actorFullID): + return container + case container.ShortID == actorFullID: + return container + } + } + } + + if actorName != "" { + for idx := range containers { + container := &containers[idx] + + for _, name := range container.Names { + if name == actorName { + return container + } + } + } + } + + return &containers[0] +} + +func emitContainerSnapshots( + ctx context.Context, + containers []clabruntime.GenericContainer, + sink chan<- aggregatedEvent, +) { + for idx := range containers { + container := containers[idx] + if !isRunningContainer(&container) { + continue + } + + event := aggregatedEventFromContainerSnapshot(&container) + if event.ActorID == "" && event.ActorName == "" { + continue + } + + select { + case sink <- event: + case <-ctx.Done(): + return + } + } +} + +func aggregatedEventFromContainerSnapshot( + container *clabruntime.GenericContainer, +) aggregatedEvent { + if container == nil { + return aggregatedEvent{} + } + + state := strings.ToLower(container.State) + + short := container.ShortID + if short == "" { + short = shortID(container.ID) + } + + attributes := cloneStringMap(container.Labels) + if attributes == nil { + attributes = make(map[string]string) + } + + if _, ok := attributes["origin"]; !ok { + attributes["origin"] = "snapshot" + } + + if container.Image != "" { + attributes["image"] = container.Image + } + + if container.Status != "" { + attributes["status"] = container.Status + } + + if state != "" { + attributes["state"] = state + } + + if container.NetworkName != "" { + attributes["network"] = container.NetworkName + } + + if container.NetworkSettings.IPv4addr != "" { + attributes["mgmt_ipv4"] = container.GetContainerIPv4() + } + + if container.NetworkSettings.IPv6addr != "" { + attributes["mgmt_ipv6"] = container.GetContainerIPv6() + } + + if len(attributes) == 0 { + attributes = nil + } + + actorName := firstContainerName(container) + + action := state + if action == "" { + action = "snapshot" + } + + return aggregatedEvent{ + Timestamp: time.Now(), + Type: "container", + Action: action, + ActorID: shortID(short), + ActorName: actorName, + ActorFullID: container.ID, + Attributes: attributes, + } +} + +func cloneStringMap(input map[string]string) map[string]string { + if len(input) == 0 { + return nil + } + + result := make(map[string]string, len(input)) + for k, v := range input { + result[k] = v + } + + return result +} + +func isRunningContainer(container *clabruntime.GenericContainer) bool { + if container == nil { + return false + } + + return strings.EqualFold(container.State, "running") +} diff --git a/core/events/types.go b/core/events/types.go new file mode 100644 index 000000000..f7b530a3c --- /dev/null +++ b/core/events/types.go @@ -0,0 +1,13 @@ +package events + +import "time" + +type aggregatedEvent struct { + Timestamp time.Time `json:"timestamp"` + Type string `json:"type"` + Action string `json:"action"` + ActorID string `json:"actor_id"` + ActorName string `json:"actor_name"` + ActorFullID string `json:"actor_full_id"` + Attributes map[string]string `json:"attributes,omitempty"` +} diff --git a/docs/cmd/events.md b/docs/cmd/events.md new file mode 100644 index 000000000..0cc6dd7ab --- /dev/null +++ b/docs/cmd/events.md @@ -0,0 +1,70 @@ +# events command + +### Description + +The `events` command streams lifecycle updates for every Containerlab resource and augments them with interface change notifications collected from the container network namespaces. The output combines the selected runtime's event feed (for example Docker) with the netlink information that powers `containerlab inspect interfaces`, so you can observe container activity and interface state changes in real time without selecting a specific lab. + +### Usage + +`containerlab [global-flags] events [local-flags]` + +**aliases:** `ev` + +The command respects the global flags such as `--runtime`, `--debug`, or `--log-level`. It adds local options: + +- `--format` controls the output representation (`plain`, `json`). +- `--initial-state` emits a snapshot of currently running containers and their interface states before following live updates. + +When invoked with no arguments it discovers all running labs and immediately begins streaming events; new labs that start after the command begins are picked up automatically. + +### Event format + +In the default `plain` format every line mirrors the `docker events` format: + +``` + (=, ...) +``` + +* **Runtime events** show the short container ID as the actor and include the original attributes supplied by the container runtime (for example `image`, `name`, `containerlab`, `scope`, …). When `--initial-state` is enabled the stream starts with `container ` snapshots (for example `container running`) that carry an `origin=snapshot` attribute. +* **Interface events** use `type` `interface` and `origin=netlink` in the attribute list. They also report interface-specific data such as `ifname`, `state`, `mtu`, `mac`, `type`, `alias`, and the lab label. The actor is still the container short ID, and the container name is supplied in the attributes (`name=...`). +* Interface notifications are emitted when a link appears, disappears, or when its relevant properties (operational state, MTU, alias, MAC address, type) change. Initial snapshots use the `snapshot` action when `--initial-state` is requested. + +When `--format json` is used, each event becomes a single JSON object on its own line. The fields match the plain output (`timestamp`, `type`, `action`, `actor_id`, `actor_name`, `actor_full_id`) and include an `attributes` map with the same key/value pairs that the plain formatter prints. + +### Examples + +#### Watch an existing lab and new deployments + +``` +$ sudo containerlab events +2024-07-01T11:02:56.123456000Z container start 5d0b5a9ad3f1 (containerlab=frr-lab, image=ghcr.io/srl-labs/frr, name=clab-frr-lab-frr01) +2024-07-01T11:02:57.004321000Z interface create 5d0b5a9ad3f1 (ifname=eth0, index=22, lab=frr-lab, mac=02:42:ac:14:00:02, mtu=1500, name=clab-frr-lab-frr01, origin=netlink, state=up, type=veth) +2024-07-01T11:02:57.104512000Z interface update 5d0b5a9ad3f1 (ifname=eth0, index=22, lab=frr-lab, mac=02:42:ac:14:00:02, mtu=9000, name=clab-frr-lab-frr01, origin=netlink, state=up, type=veth) +2024-07-01T11:05:12.918273000Z container die 5d0b5a9ad3f1 (containerlab=frr-lab, exitCode=0, image=ghcr.io/srl-labs/frr, name=clab-frr-lab-frr01) +2024-07-01T11:05:13.018456000Z interface delete 5d0b5a9ad3f1 (ifname=eth0, index=22, lab=frr-lab, name=clab-frr-lab-frr01, origin=netlink, state=up, type=veth) +``` + +The stream contains all currently running labs and stays active to capture subsequent deployments, restarts, or interface adjustments. + +#### Include existing resources in the stream + +``` +$ sudo containerlab events --initial-state +2024-07-01T11:02:55.912345000Z container running 5d0b5a9ad3f1 (containerlab=frr-lab, image=ghcr.io/srl-labs/frr, name=clab-frr-lab-frr01, origin=snapshot, state=running) +2024-07-01T11:02:55.912678000Z interface snapshot 5d0b5a9ad3f1 (ifname=eth0, index=22, lab=frr-lab, mac=02:42:ac:14:00:02, mtu=1500, name=clab-frr-lab-frr01, origin=netlink, state=up, type=veth) +… +``` + +This mode begins with a point-in-time view of every running container and interface before switching to live updates. + +#### Use with alternative runtimes + +Containerlab streams events from the runtime selected via the global `--runtime` flag. + +> **Currently supported runtime:** `docker` +> Runtimes that do not implement the `events` API (or are not yet supported by Containerlab) will exit with an explanatory error. + +### See also + +* [`inspect interfaces`](inspect/interfaces.md) – produces a point-in-time view of the same interface details that `events` reports continuously. +* `docker events` – the raw runtime feed that Containerlab builds upon. diff --git a/mkdocs.yml b/mkdocs.yml index 68237cb7a..448b61ef6 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -99,6 +99,7 @@ nav: - inspect: - cmd/inspect/index.md - interfaces: cmd/inspect/interfaces.md + - events: cmd/events.md - save: cmd/save.md - exec: cmd/exec.md - generate: cmd/generate.md diff --git a/mocks/mockruntime/runtime.go b/mocks/mockruntime/runtime.go index d408e3297..6a8c6be09 100644 --- a/mocks/mockruntime/runtime.go +++ b/mocks/mockruntime/runtime.go @@ -365,6 +365,22 @@ func (mr *MockContainerRuntimeMockRecorder) StopContainer(arg0, arg1 any) *gomoc return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StopContainer", reflect.TypeOf((*MockContainerRuntime)(nil).StopContainer), arg0, arg1) } +// StreamEvents mocks base method. +func (m *MockContainerRuntime) StreamEvents(ctx context.Context, opts runtime.EventStreamOptions) (<-chan runtime.ContainerEvent, <-chan error, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "StreamEvents", ctx, opts) + ret0, _ := ret[0].(<-chan runtime.ContainerEvent) + ret1, _ := ret[1].(<-chan error) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// StreamEvents indicates an expected call of StreamEvents. +func (mr *MockContainerRuntimeMockRecorder) StreamEvents(ctx, opts any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StreamEvents", reflect.TypeOf((*MockContainerRuntime)(nil).StreamEvents), ctx, opts) +} + // StreamLogs mocks base method. func (m *MockContainerRuntime) StreamLogs(ctx context.Context, containerName string) (io.ReadCloser, error) { m.ctrl.T.Helper() diff --git a/runtime/docker/events.go b/runtime/docker/events.go new file mode 100644 index 000000000..636a0171e --- /dev/null +++ b/runtime/docker/events.go @@ -0,0 +1,79 @@ +package docker + +import ( + "context" + "errors" + "fmt" + + dockerTypes "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/filters" + + clabruntime "github.com/srl-labs/containerlab/runtime" + clabutils "github.com/srl-labs/containerlab/utils" +) + +func (d *DockerRuntime) StreamEvents( + ctx context.Context, + opts clabruntime.EventStreamOptions, +) (<-chan clabruntime.ContainerEvent, <-chan error, error) { + events := make(chan clabruntime.ContainerEvent, 128) + errs := make(chan error, 1) + + go d.streamDockerEvents(ctx, opts, events, errs) + + return events, errs, nil +} + +func (d *DockerRuntime) streamDockerEvents( + ctx context.Context, + opts clabruntime.EventStreamOptions, + eventSink chan<- clabruntime.ContainerEvent, + errSink chan<- error, +) { + defer close(eventSink) + defer close(errSink) + + filtersArgs := filters.NewArgs() + for key, value := range opts.Labels { + if value == "" { + filtersArgs.Add("label", key) + } else { + filtersArgs.Add("label", fmt.Sprintf("%s=%s", key, value)) + } + } + + messages, errs := d.Client.Events(ctx, dockerTypes.EventsOptions{Filters: filtersArgs}) + + for { + select { + case <-ctx.Done(): + return + case err, ok := <-errs: + if !ok { + return + } + + if err != nil && !errors.Is(err, context.Canceled) { + errSink <- err + + return + } + case msg, ok := <-messages: + if !ok { + return + } + + eventData := clabutils.DockerMessageToEventData(msg) + + eventSink <- clabruntime.ContainerEvent{ + Timestamp: eventData.Timestamp, + Type: eventData.Type, + Action: eventData.Action, + ActorID: eventData.ActorID, + ActorName: eventData.ActorName, + ActorFullID: eventData.ActorFullID, + Attributes: eventData.Attributes, + } + } + } +} diff --git a/runtime/ignite/ignite.go b/runtime/ignite/ignite.go index c441ee2bb..218c53224 100644 --- a/runtime/ignite/ignite.go +++ b/runtime/ignite/ignite.go @@ -523,3 +523,10 @@ func (*IgniteRuntime) GetCooCBindMounts() clabtypes.Binds { func (*IgniteRuntime) StreamLogs(ctx context.Context, containerName string) (io.ReadCloser, error) { return nil, fmt.Errorf("StreamLogs not implemented for Ignite runtime") } + +func (*IgniteRuntime) StreamEvents( + context.Context, + clabruntime.EventStreamOptions, +) (<-chan clabruntime.ContainerEvent, <-chan error, error) { + return nil, nil, fmt.Errorf("StreamEvents is not implemented for Ignite runtime") +} diff --git a/runtime/podman/podman.go b/runtime/podman/podman.go index 1f376ea81..e1a088b95 100644 --- a/runtime/podman/podman.go +++ b/runtime/podman/podman.go @@ -494,3 +494,10 @@ func (r *PodmanRuntime) GetRuntimeSocket() (string, error) { func (*PodmanRuntime) StreamLogs(ctx context.Context, containerName string) (io.ReadCloser, error) { return nil, fmt.Errorf("StreamLogs not implemented for Podman runtime") } + +func (*PodmanRuntime) StreamEvents( + context.Context, + runtime.EventStreamOptions, +) (<-chan runtime.ContainerEvent, <-chan error, error) { + return nil, nil, fmt.Errorf("StreamEvents is not implemented for Podman runtime") +} diff --git a/runtime/runtime.go b/runtime/runtime.go index 5758782ed..b56eded9e 100644 --- a/runtime/runtime.go +++ b/runtime/runtime.go @@ -78,6 +78,8 @@ type ContainerRuntime interface { // StreamLogs returns a reader for the container's logs // The caller needs to close the returned ReadCloser. StreamLogs(ctx context.Context, containerName string) (io.ReadCloser, error) + // StreamEvents streams runtime events that match provided options. + StreamEvents(ctx context.Context, opts EventStreamOptions) (<-chan ContainerEvent, <-chan error, error) } type ContainerStatus string @@ -88,6 +90,34 @@ const ( Stopped = "Stopped" ) +const ( + EventTypeContainer = "container" +) + +const ( + EventActionStart = "start" + EventActionUnpause = "unpause" + EventActionRestart = "restart" + EventActionDie = "die" + EventActionStop = "stop" + EventActionDestroy = "destroy" + EventActionKill = "kill" +) + +type EventStreamOptions struct { + Labels map[string]string +} + +type ContainerEvent struct { + Timestamp time.Time + Type string + Action string + ActorID string + ActorName string + ActorFullID string + Attributes map[string]string +} + type Initializer func() ContainerRuntime type RuntimeOption func(ContainerRuntime) diff --git a/tests/01-smoke/26-events.robot b/tests/01-smoke/26-events.robot new file mode 100644 index 000000000..1033c1602 --- /dev/null +++ b/tests/01-smoke/26-events.robot @@ -0,0 +1,143 @@ +*** Comments *** +This suite verifies the `containerlab events` command for both plain and JSON output formats. + + +*** Settings *** +Library OperatingSystem +Library Process +Library String +Resource ../common.robot + + +*** Variables *** +${runtime} docker +${lab-name} 2-linux-nodes +${topo} ${CURDIR}/01-linux-nodes.clab.yml + + +*** Test Cases *** +Events Command Streams Plain Output + [Documentation] Verify that the plain formatter emits container lifecycle and interface updates enriched with netlink attributes. + ${plain-log} Set Variable /tmp/clab-events-plain.log + ${plain-err} Set Variable /tmp/clab-events-plain.err + Remove File If Exists ${plain-log} + Remove File If Exists ${plain-err} + TRY + Start Events Process events_plain plain ${plain-log} ${plain-err} + Deploy Lab For Events + Sleep 5s + Destroy Lab For Events + Sleep 3s + Stop Events Process events_plain + ${plain-output} = Get File ${plain-log} + Log ${plain-output} + Should Contain ${plain-output} container create + Should Contain ${plain-output} container start + Should Contain ${plain-output} container die + Should Contain ${plain-output} interface create + Should Contain ${plain-output} origin=netlink + FINALLY + Cleanup Events Scenario events_plain + Remove File If Exists ${plain-log} + Remove File If Exists ${plain-err} + END + +Events Command Emits Initial State Snapshot + [Documentation] Verify that enabling --initial-state emits running containers and their interfaces before live updates. + ${snapshot-log} Set Variable /tmp/clab-events-snapshot.log + ${snapshot-err} Set Variable /tmp/clab-events-snapshot.err + Remove File If Exists ${snapshot-log} + Remove File If Exists ${snapshot-err} + TRY + Deploy Lab For Events + Sleep 3s + Start Events Process events_snapshot plain ${snapshot-log} ${snapshot-err} True + Sleep 3s + Stop Events Process events_snapshot + ${snapshot-output} = Get File ${snapshot-log} + Log ${snapshot-output} + Should Contain ${snapshot-output} container running + Should Contain ${snapshot-output} origin=snapshot + Should Contain ${snapshot-output} interface snapshot + FINALLY + Cleanup Events Scenario events_snapshot + Remove File If Exists ${snapshot-log} + Remove File If Exists ${snapshot-err} + END + +Events Command Streams JSON Output + [Documentation] Verify that JSON formatted events remain valid JSON lines and retain interface metadata. + ${json-log} Set Variable /tmp/clab-events-json.log + ${json-err} Set Variable /tmp/clab-events-json.err + Remove File If Exists ${json-log} + Remove File If Exists ${json-err} + TRY + Start Events Process events_json json ${json-log} ${json-err} + Deploy Lab For Events + Sleep 5s + Destroy Lab For Events + Sleep 3s + Stop Events Process events_json + ${json-output} = Get File ${json-log} + Log ${json-output} + Should Not Be Empty ${json-output} + Should Contain ${json-output} "type":"container" + Should Contain ${json-output} "type":"interface" + Should Contain ${json-output} "origin":"netlink" + Validate JSON Lines ${json-log} + ${actor} Set Variable clab-${lab-name}-l1 + ${rc} ${output} = Run And Return Rc And Output + ... bash -lc "jq -r 'select(.actor_name==\"${actor}\") | .actor_id' ${json-log} | head -n 1" + Log ${output} + Should Be Equal As Integers ${rc} 0 + Should Not Be Empty ${output} + FINALLY + Cleanup Events Scenario events_json + Remove File If Exists ${json-log} + Remove File If Exists ${json-err} + END + + +*** Keywords *** +Remove File If Exists + [Arguments] ${path} + Run Keyword And Ignore Error Remove File ${path} + +Start Events Process + [Arguments] ${alias} ${format} ${stdout} ${stderr} ${initial}=False + ${cmd} Set Variable ${CLAB_BIN} --runtime ${runtime} events --format ${format} + ${cmd} Run Keyword If '${initial}'=='True' Catenate ${cmd} --initial-state ELSE Set Variable ${cmd} + Start Process ${cmd} shell=True alias=${alias} stdout=${stdout} stderr=${stderr} + Sleep 1s + +Stop Events Process + [Arguments] ${alias} + Run Keyword And Ignore Error Terminate Process ${alias} kill=True + Run Keyword And Ignore Error Wait For Process ${alias} + +Deploy Lab For Events + ${rc} ${output} = Run And Return Rc And Output + ... ${CLAB_BIN} --runtime ${runtime} deploy -t ${topo} + Log ${output} + Should Be Equal As Integers ${rc} 0 + +Destroy Lab For Events + ${rc} ${output} = Run And Return Rc And Output + ... ${CLAB_BIN} --runtime ${runtime} destroy -t ${topo} --cleanup + Log ${output} + Should Be Equal As Integers ${rc} 0 + +Cleanup Events Scenario + [Arguments] ${alias} + Run Keyword And Ignore Error Terminate Process ${alias} kill=True + Run Keyword And Ignore Error Wait For Process ${alias} + Run Keyword And Ignore Error Run ${CLAB_BIN} --runtime ${runtime} destroy -t ${topo} --cleanup + +Validate JSON Lines + [Arguments] ${path} + ${result} = Process.Run Process + ... bash -lc "python -c 'import json,sys; [json.loads(line) for line in sys.stdin]' < ${path}" + ... shell=True + Log ${result.stdout} + Log ${result.stderr} + Should Be Equal As Integers ${result.rc} 0 diff --git a/utils/events.go b/utils/events.go new file mode 100644 index 000000000..a501ab07b --- /dev/null +++ b/utils/events.go @@ -0,0 +1,48 @@ +package utils + +import ( + "time" + + dockerEvents "github.com/docker/docker/api/types/events" +) + +// DockerEventData captures container-related information from a Docker event message. +type DockerEventData struct { + Timestamp time.Time + Type string + Action string + ActorID string + ActorName string + ActorFullID string + Attributes map[string]string +} + +// DockerMessageToEventData normalizes a Docker event message into DockerEventData. +func DockerMessageToEventData(msg dockerEvents.Message) DockerEventData { + ts := time.Unix(0, msg.TimeNano) + if ts.IsZero() { + ts = time.Unix(msg.Time, 0) + } + if ts.IsZero() { + ts = time.Now() + } + + attributes := make(map[string]string, len(msg.Actor.Attributes)+1) + for k, v := range msg.Actor.Attributes { + attributes[k] = v + } + + if msg.Scope != "" { + attributes["scope"] = msg.Scope + } + + return DockerEventData{ + Timestamp: ts, + Type: string(msg.Type), + Action: string(msg.Action), + ActorID: msg.Actor.ID, + ActorName: attributes["name"], + ActorFullID: msg.Actor.ID, + Attributes: attributes, + } +}