Skip to content

Commit c16a1f0

Browse files
alyssa-dbdenikshreyas-goenkaandrewnesterdeco-sdk-tagging[bot]
authored
pipelines: new logs command (#3269)
## PR Description new `pipeline logs` command **Changes** - Get logs of pipeline in project by resource names with user prompting, or by default, auto-selection when exactly one pipeline exists in project - Update ID filtering via `--update-id` flag, or by default, most recent update id - Log level filtering via `--level` flag (INFO, WARN, ERROR, METRICS) - Event type filtering via `--event-type` flag - Result limiting via `--number` flag - Filter for events only after `--start-time` or before `--end-time` flags **Tests** - Unit tests for `buildFieldFilter`, `buildPipelineEventFilter`, `parseAndFormatTimestamp` - Edge case coverage for no updates, missing pipeline IDs, invalid filters Acceptance tests coming in follow-up: #3270 --------- Co-authored-by: Denis Bilenko <[email protected]> Co-authored-by: shreyas-goenka <[email protected]> Co-authored-by: Andrew Nester <[email protected]> Co-authored-by: deco-sdk-tagging[bot] <192229699+deco-sdk-tagging[bot]@users.noreply.github.com> Co-authored-by: Jeffery Cheng <[email protected]> Co-authored-by: Pieter Noordhuis <[email protected]> Co-authored-by: akolar-db <[email protected]>
1 parent ff570ec commit c16a1f0

File tree

7 files changed

+416
-24
lines changed

7 files changed

+416
-24
lines changed

acceptance/pipelines/install-pipelines-cli/output.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ Available Commands:
1717
dry-run Validate correctness of the pipeline's graph
1818
help Help about any command
1919
init Initialize a new pipelines project
20+
logs Retrieve events for a pipeline
2021
open Open a pipeline in the browser
2122
run Run a pipeline
2223
stop Stop a pipeline
@@ -61,6 +62,7 @@ Available Commands:
6162
dry-run Validate correctness of the pipeline's graph
6263
help Help about any command
6364
init Initialize a new pipelines project
65+
logs Retrieve events for a pipeline
6466
open Open a pipeline in the browser
6567
run Run a pipeline
6668
stop Stop a pipeline
@@ -98,6 +100,7 @@ Available Commands:
98100
dry-run Validate correctness of the pipeline's graph
99101
help Help about any command
100102
init Initialize a new pipelines project
103+
logs Retrieve events for a pipeline
101104
open Open a pipeline in the browser
102105
run Run a pipeline
103106
stop Stop a pipeline

cmd/pipelines/logs.go

Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
package pipelines
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"strings"
8+
9+
"github.com/databricks/cli/bundle"
10+
configresources "github.com/databricks/cli/bundle/config/resources"
11+
"github.com/databricks/cli/bundle/phases"
12+
"github.com/databricks/cli/bundle/resources"
13+
"github.com/databricks/cli/bundle/run"
14+
15+
"github.com/databricks/cli/bundle/statemgmt"
16+
"github.com/databricks/cli/cmd/bundle/utils"
17+
"github.com/databricks/cli/cmd/root"
18+
"github.com/databricks/cli/libs/cmdgroup"
19+
"github.com/databricks/cli/libs/cmdio"
20+
"github.com/databricks/cli/libs/logdiag"
21+
22+
"github.com/spf13/cobra"
23+
)
24+
25+
// resolveLogsArgument auto-selects a pipeline if there's exactly one and no arguments are specified,
26+
// otherwise prompts the user to select a pipeline.
27+
func resolveLogsArgument(ctx context.Context, b *bundle.Bundle, args []string) (string, error) {
28+
if len(args) == 1 {
29+
return args[0], nil
30+
}
31+
32+
if key := autoSelectSinglePipeline(b); key != "" {
33+
return key, nil
34+
}
35+
36+
if cmdio.IsPromptSupported(ctx) {
37+
return promptResource(ctx, b, run.IsRunnable, func(ref resources.Reference) bool {
38+
_, ok := ref.Resource.(*configresources.Pipeline)
39+
return ok
40+
})
41+
}
42+
return "", errors.New("expected a KEY of the pipeline")
43+
}
44+
45+
// buildFieldFilter creates a SQL filter condition for a field with multiple possible values,
46+
// generating "field in ('value1')" for a single value or "field in ('value1', 'value2')" for multiple values.
47+
func buildFieldFilter(field string, values []string) string {
48+
if len(values) == 0 {
49+
return ""
50+
}
51+
52+
quotedValues := "'" + strings.Join(values, "', '") + "'"
53+
return fmt.Sprintf("%s in (%s)", field, quotedValues)
54+
}
55+
56+
// buildPipelineEventFilter constructs a SQL filter string for pipeline events based on the provided parameters.
57+
func buildPipelineEventFilter(updateId string, levels, eventTypes []string, startTime, endTime string) string {
58+
var filterParts []string
59+
60+
if updateId != "" {
61+
filterParts = append(filterParts, fmt.Sprintf("update_id = '%s'", updateId))
62+
}
63+
64+
if levelFilter := buildFieldFilter("level", levels); levelFilter != "" {
65+
filterParts = append(filterParts, levelFilter)
66+
}
67+
68+
if typeFilter := buildFieldFilter("event_type", eventTypes); typeFilter != "" {
69+
filterParts = append(filterParts, typeFilter)
70+
}
71+
72+
if startTime != "" {
73+
filterParts = append(filterParts, fmt.Sprintf("timestamp >= '%s'", startTime))
74+
}
75+
76+
if endTime != "" {
77+
filterParts = append(filterParts, fmt.Sprintf("timestamp <= '%s'", endTime))
78+
}
79+
80+
return strings.Join(filterParts, " AND ")
81+
}
82+
83+
func logsCommand() *cobra.Command {
84+
cmd := &cobra.Command{
85+
Use: "logs [flags] [KEY]",
86+
Args: root.MaximumNArgs(1),
87+
Short: "Retrieve events for a pipeline",
88+
Long: `Retrieve events for the pipeline identified by KEY.
89+
KEY is the unique name of the pipeline, as defined in its YAML file.
90+
By default, show the events of the pipeline's most recent update.
91+
92+
Example usage:
93+
1. pipelines logs pipeline-name --update-id update-1 -n 10
94+
2. pipelines logs pipeline-name --level ERROR,METRICS --event-type update_progress --start-time 2025-01-15T10:30:00Z`,
95+
}
96+
97+
var updateId string
98+
var levels []string
99+
var eventTypes []string
100+
var number int
101+
var startTime string
102+
var endTime string
103+
104+
filterGroup := cmdgroup.NewFlagGroup("Event Filter")
105+
filterGroup.FlagSet().StringVar(&updateId, "update-id", "", "Filter events by update ID. If not provided, uses the most recent update ID.")
106+
filterGroup.FlagSet().StringSliceVar(&levels, "level", nil, "Filter events by list of log levels (INFO, WARN, ERROR, METRICS). ")
107+
filterGroup.FlagSet().StringSliceVar(&eventTypes, "event-type", nil, "Filter events by list of event types.")
108+
filterGroup.FlagSet().IntVarP(&number, "number", "n", 0, "Number of events to return.")
109+
filterGroup.FlagSet().StringVar(&startTime, "start-time", "", "Filter for events that are after this start time (format: 2025-01-15T10:30:00Z)")
110+
filterGroup.FlagSet().StringVar(&endTime, "end-time", "", "Filter for events that are before this end time (format: 2025-01-15T10:30:00Z)")
111+
112+
wrappedCmd := cmdgroup.NewCommandWithGroupFlag(cmd)
113+
wrappedCmd.AddFlagGroup(filterGroup)
114+
115+
cmd.RunE = func(cmd *cobra.Command, args []string) error {
116+
ctx := logdiag.InitContext(cmd.Context())
117+
cmd.SetContext(ctx)
118+
119+
b := utils.ConfigureBundleWithVariables(cmd)
120+
if b == nil || logdiag.HasError(ctx) {
121+
return root.ErrAlreadyPrinted
122+
}
123+
124+
phases.Initialize(ctx, b)
125+
if logdiag.HasError(ctx) {
126+
return root.ErrAlreadyPrinted
127+
}
128+
129+
// Load the deployment state to get pipeline IDs from resource
130+
bundle.ApplySeqContext(ctx, b,
131+
statemgmt.StatePull(),
132+
statemgmt.Load(),
133+
)
134+
if logdiag.HasError(ctx) {
135+
return root.ErrAlreadyPrinted
136+
}
137+
138+
arg, err := resolveLogsArgument(ctx, b, args)
139+
if err != nil {
140+
return err
141+
}
142+
143+
ref, err := resources.Lookup(b, arg)
144+
if err != nil {
145+
return err
146+
}
147+
148+
pipeline, ok := ref.Resource.(*configresources.Pipeline)
149+
if !ok {
150+
return fmt.Errorf("resource %s is not a pipeline", arg)
151+
}
152+
153+
pipelineId := pipeline.ID
154+
if pipelineId == "" {
155+
return fmt.Errorf("pipeline ID for pipeline %s is not found", ref.Key)
156+
}
157+
158+
w := b.WorkspaceClient()
159+
if updateId == "" {
160+
updateId, err = getMostRecentUpdateId(ctx, w, pipelineId)
161+
if err != nil {
162+
return fmt.Errorf("failed to get most recent update ID: %w", err)
163+
}
164+
}
165+
166+
if startTime != "" {
167+
startTime, err = parseAndFormatTimestamp(startTime)
168+
if err != nil {
169+
return fmt.Errorf("invalid start time format: %w", err)
170+
}
171+
}
172+
173+
if endTime != "" {
174+
endTime, err = parseAndFormatTimestamp(endTime)
175+
if err != nil {
176+
return fmt.Errorf("invalid end time format: %w", err)
177+
}
178+
}
179+
180+
filter := buildPipelineEventFilter(updateId, levels, eventTypes, startTime, endTime)
181+
182+
params := &PipelineEventsQueryParams{
183+
Filter: filter,
184+
OrderBy: "timestamp desc",
185+
}
186+
187+
// Only set MaxResults if the flag was provided, avoiding setting to the default value.
188+
if cmd.Flags().Changed("number") {
189+
params.MaxResults = number
190+
}
191+
192+
events, err := fetchAllPipelineEvents(ctx, w, pipelineId, params)
193+
if err != nil {
194+
return fmt.Errorf("failed to fetch events for pipeline %s with update ID %s: %w", pipelineId, updateId, err)
195+
}
196+
197+
return cmdio.Render(ctx, events)
198+
}
199+
200+
return cmd
201+
}

cmd/pipelines/logs_test.go

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
package pipelines
2+
3+
import (
4+
"testing"
5+
)
6+
7+
func TestBuildFieldFilter(t *testing.T) {
8+
tests := []struct {
9+
name string
10+
field string
11+
values []string
12+
expected string
13+
}{
14+
{
15+
name: "empty values",
16+
field: "level",
17+
values: []string{},
18+
expected: "",
19+
},
20+
{
21+
name: "single value",
22+
field: "level",
23+
values: []string{"ERROR"},
24+
expected: "level in ('ERROR')",
25+
},
26+
{
27+
name: "multiple values with spaces",
28+
field: "level",
29+
values: []string{"ERROR", "METRICS"},
30+
expected: "level in ('ERROR', 'METRICS')",
31+
},
32+
{
33+
name: "event types multiple values",
34+
field: "event_type",
35+
values: []string{"update_progress", "flow_progress"},
36+
expected: "event_type in ('update_progress', 'flow_progress')",
37+
},
38+
}
39+
40+
for _, tt := range tests {
41+
t.Run(tt.name, func(t *testing.T) {
42+
result := buildFieldFilter(tt.field, tt.values)
43+
if result != tt.expected {
44+
t.Errorf("buildFieldFilter(%q, %v) = %q, want %q", tt.field, tt.values, result, tt.expected)
45+
}
46+
})
47+
}
48+
}
49+
50+
func TestBuildPipelineEventFilter(t *testing.T) {
51+
tests := []struct {
52+
name string
53+
updateId string
54+
levels []string
55+
eventTypes []string
56+
startTime string
57+
endTime string
58+
expected string
59+
}{
60+
{
61+
name: "no filters",
62+
expected: "",
63+
},
64+
{
65+
name: "update id only",
66+
updateId: "update-1",
67+
expected: "update_id = 'update-1'",
68+
},
69+
{
70+
name: "multiple filters",
71+
updateId: "update-1",
72+
levels: []string{"ERROR", "METRICS"},
73+
eventTypes: []string{"update_progress"},
74+
expected: "update_id = 'update-1' AND level in ('ERROR', 'METRICS') AND event_type in ('update_progress')",
75+
},
76+
{
77+
name: "event types with multiple values",
78+
updateId: "update-2",
79+
levels: []string{"INFO"},
80+
eventTypes: []string{"update_progress", "flow_progress"},
81+
expected: "update_id = 'update-2' AND level in ('INFO') AND event_type in ('update_progress', 'flow_progress')",
82+
},
83+
{
84+
name: "start time only",
85+
updateId: "",
86+
levels: []string{},
87+
eventTypes: []string{},
88+
startTime: "2025-01-15T10:30:00Z",
89+
expected: "timestamp >= '2025-01-15T10:30:00Z'",
90+
},
91+
{
92+
name: "start time and end time",
93+
updateId: "update-3",
94+
levels: []string{"ERROR"},
95+
eventTypes: []string{},
96+
startTime: "2025-01-15T10:30:00Z",
97+
endTime: "2025-01-15T11:30:00Z",
98+
expected: "update_id = 'update-3' AND level in ('ERROR') AND timestamp >= '2025-01-15T10:30:00Z' AND timestamp <= '2025-01-15T11:30:00Z'",
99+
},
100+
}
101+
102+
for _, tt := range tests {
103+
t.Run(tt.name, func(t *testing.T) {
104+
result := buildPipelineEventFilter(tt.updateId, tt.levels, tt.eventTypes, tt.startTime, tt.endTime)
105+
if result != tt.expected {
106+
t.Errorf("buildPipelineEventFilter(%q, %v, %v, %q, %q) = %q, want %q", tt.updateId, tt.levels, tt.eventTypes, tt.startTime, tt.endTime, result, tt.expected)
107+
}
108+
})
109+
}
110+
}
111+
112+
func TestParseAndFormatTimestamp(t *testing.T) {
113+
tests := []struct {
114+
name string
115+
input string
116+
expected string
117+
expectError bool
118+
}{
119+
{
120+
name: "valid timestamp",
121+
input: "2025-08-11T21:46:14Z",
122+
expected: "2025-08-11T21:46:14.000Z",
123+
expectError: false,
124+
},
125+
{
126+
name: "empty string",
127+
input: "",
128+
expected: "",
129+
expectError: false,
130+
},
131+
{
132+
name: "invalid timestamp",
133+
input: "invalid-timestamp",
134+
expected: "",
135+
expectError: true,
136+
},
137+
}
138+
139+
for _, tt := range tests {
140+
t.Run(tt.name, func(t *testing.T) {
141+
result, err := parseAndFormatTimestamp(tt.input)
142+
if tt.expectError {
143+
if err == nil {
144+
t.Errorf("expected error for invalid timestamp %q, but got none", tt.input)
145+
}
146+
t.Skip()
147+
}
148+
if err != nil {
149+
t.Errorf("unexpected error: %v", err)
150+
}
151+
if result != tt.expected {
152+
t.Errorf("parseAndFormatTimestamp(%q) = %q, want %q", tt.input, result, tt.expected)
153+
}
154+
})
155+
}
156+
}

0 commit comments

Comments
 (0)