Skip to content

Commit 2c4255b

Browse files
authored
pipelines run: duration per event of update_progress (#3337)
## Changes On successful runs, display length of time taken between consecutive update_progress events. ## Tests Acceptance tests: #3355 Example output: <img width="265" height="97" alt="image" src="https://github.com/user-attachments/assets/fee7d9e3-718a-401b-aa5d-aec1baf5d649" />
1 parent 5fe0428 commit 2c4255b

File tree

9 files changed

+413
-28
lines changed

9 files changed

+413
-28
lines changed

acceptance/pipelines/e2e/output.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ View your pipeline my_project_pipeline here: [DATABRICKS_URL]/pipelines/[UUID]?o
2323
Update URL: [DATABRICKS_URL]/#joblist/pipelines/[UUID]/updates/[UUID]
2424

2525
Update ID: [UUID]
26+
2627
Update for pipeline completed successfully.
2728
Pipeline configurations for this update:
2829
• All tables are refreshed
@@ -66,6 +67,7 @@ View your pipeline my_project_pipeline_2 here: [DATABRICKS_URL]/pipelines/[UUID]
6667
Update URL: [DATABRICKS_URL]/#joblist/pipelines/[UUID]/updates/[UUID]
6768

6869
Update ID: [UUID]
70+
6971
Update for pipeline completed successfully.
7072
Pipeline configurations for this update:
7173
• All tables are refreshed

acceptance/pipelines/run/no-wait/output.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ View your pipeline my_pipeline here: [DATABRICKS_URL]/pipelines/[UUID]?o=[NUMID]
1111
Update URL: [DATABRICKS_URL]/#joblist/pipelines/[UUID]/updates/[UUID]
1212

1313
Update ID: [UUID]
14+
1415
Update for pipeline completed successfully.
1516
Pipeline configurations for this update:
1617
• All tables are refreshed

acceptance/pipelines/run/refresh-flags/output.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ View your pipeline my_pipeline here: [DATABRICKS_URL]/pipelines/[UUID]?o=[NUMID]
1111
Update URL: [DATABRICKS_URL]/#joblist/pipelines/[UUID]/updates/[UUID]
1212

1313
Update ID: [UUID]
14+
1415
Update for pipeline completed successfully.
1516
Pipeline configurations for this update:
1617
• All tables are refreshed
@@ -32,6 +33,7 @@ Pipeline configurations for this update:
3233
Update URL: [DATABRICKS_URL]/#joblist/pipelines/[UUID]/updates/[UUID]
3334

3435
Update ID: [UUID]
36+
3537
Update for pipeline completed successfully.
3638
Pipeline configurations for this update:
3739
• All tables are refreshed
@@ -50,6 +52,7 @@ Pipeline configurations for this update:
5052
Update URL: [DATABRICKS_URL]/#joblist/pipelines/[UUID]/updates/[UUID]
5153

5254
Update ID: [UUID]
55+
5356
Update for pipeline completed successfully.
5457
Pipeline configurations for this update:
5558
• All tables are refreshed
@@ -71,6 +74,7 @@ Pipeline configurations for this update:
7174
Update URL: [DATABRICKS_URL]/#joblist/pipelines/[UUID]/updates/[UUID]
7275

7376
Update ID: [UUID]
77+
7478
Update for pipeline completed successfully.
7579
Pipeline configurations for this update:
7680
• All tables are refreshed

acceptance/pipelines/run/restart/output.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ View your pipeline my_pipeline here: [DATABRICKS_URL]/pipelines/[UUID]?o=[NUMID]
1111
Update URL: [DATABRICKS_URL]/#joblist/pipelines/[UUID]/updates/[UUID]
1212

1313
Update ID: [UUID]
14+
1415
Update for pipeline completed successfully.
1516
Pipeline configurations for this update:
1617
• All tables are refreshed

acceptance/pipelines/run/run-pipeline/output.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ View your pipeline my_pipeline here: [DATABRICKS_URL]/pipelines/[UUID]?o=[NUMID]
1111
Update URL: [DATABRICKS_URL]/#joblist/pipelines/[UUID]/updates/[UUID]
1212

1313
Update ID: [UUID]
14+
1415
Update for pipeline completed successfully.
1516
Pipeline configurations for this update:
1617
• All tables are refreshed
@@ -20,6 +21,7 @@ Pipeline configurations for this update:
2021
Update URL: [DATABRICKS_URL]/#joblist/pipelines/[UUID]/updates/[UUID]
2122

2223
Update ID: [UUID]
24+
2325
Update for pipeline completed successfully.
2426
Pipeline configurations for this update:
2527
• All tables are refreshed

acceptance/pipelines/stop/output.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ View your pipeline my_pipeline here: [DATABRICKS_URL]/pipelines/[UUID]?o=[NUMID]
1010
Update URL: [DATABRICKS_URL]/#joblist/pipelines/[UUID]/updates/[UUID]
1111

1212
Update ID: [UUID]
13+
1314
Update for pipeline completed successfully.
1415
Pipeline configurations for this update:
1516
• All tables are refreshed

cmd/pipelines/run.go

Lines changed: 150 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"encoding/json"
88
"errors"
99
"fmt"
10+
"strings"
1011
"time"
1112

1213
"github.com/databricks/cli/bundle"
@@ -23,6 +24,7 @@ import (
2324
"github.com/databricks/cli/libs/cmdio"
2425
"github.com/databricks/cli/libs/flags"
2526
"github.com/databricks/cli/libs/logdiag"
27+
"github.com/databricks/databricks-sdk-go"
2628
"github.com/databricks/databricks-sdk-go/service/pipelines"
2729
"github.com/spf13/cobra"
2830
"golang.org/x/exp/maps"
@@ -34,26 +36,147 @@ type PipelineUpdateData struct {
3436
LastEventTime string
3537
}
3638

37-
// fetchAndDisplayPipelineUpdate fetches the latest update for a pipeline and displays information about it.
38-
func fetchAndDisplayPipelineUpdate(ctx context.Context, bundle *bundle.Bundle, ref bundleresources.Reference, updateId string) error {
39-
w := bundle.WorkspaceClient()
39+
type ProgressEventWithDuration struct {
40+
Event pipelines.PipelineEvent
41+
Duration string
42+
Phase string
43+
}
44+
45+
type ProgressEventsData struct {
46+
ProgressEvents []ProgressEventWithDuration
47+
}
48+
49+
// phaseFromUpdateProgress extracts the phase name from an event message by checking if it contains any of the UpdateInfoState values
50+
// Example: "Update 6fc8a8 is WAITING_FOR_RESOURCES." -> "WAITING_FOR_RESOURCES"
51+
func phaseFromUpdateProgress(eventMessage string) (string, error) {
52+
var updateInfoState pipelines.UpdateInfoState
53+
updateInfoStates := updateInfoState.Values()
54+
55+
for _, state := range updateInfoStates {
56+
if strings.Contains(eventMessage, string(state)) {
57+
return string(state), nil
58+
}
59+
}
60+
61+
return "", fmt.Errorf("no phase found in message: %s", eventMessage)
62+
}
63+
64+
// readableDuration returns a readable duration string for a given duration.
65+
func readableDuration(diff time.Duration) (string, error) {
66+
if diff < 0 {
67+
return "", fmt.Errorf("duration cannot be negative: %v", diff)
68+
}
69+
70+
if diff < time.Second {
71+
milliseconds := int(diff.Milliseconds())
72+
return fmt.Sprintf("%dms", milliseconds), nil
73+
}
74+
75+
if diff < time.Minute {
76+
return fmt.Sprintf("%.1fs", diff.Seconds()), nil
77+
}
78+
79+
if diff < time.Hour {
80+
minutes := int(diff.Minutes())
81+
seconds := int(diff.Seconds()) % 60
82+
return fmt.Sprintf("%dm %ds", minutes, seconds), nil
83+
}
84+
85+
hours := int(diff.Hours())
86+
minutes := int(diff.Minutes()) % 60
87+
return fmt.Sprintf("%dh %dm", hours, minutes), nil
88+
}
89+
90+
// eventTimeDifference returns the time difference between two events.
91+
func eventTimeDifference(earlierEvent, laterEvent pipelines.PipelineEvent) (time.Duration, error) {
92+
earlierTime, err := time.Parse(time.RFC3339Nano, earlierEvent.Timestamp)
93+
if err != nil {
94+
return 0, err
95+
}
96+
laterTime, err := time.Parse(time.RFC3339Nano, laterEvent.Timestamp)
97+
if err != nil {
98+
return 0, err
99+
}
100+
101+
timeDifference := laterTime.Sub(earlierTime)
102+
if timeDifference < 0 {
103+
return 0, errors.New("second event timestamp must be after first event timestamp")
104+
}
105+
return timeDifference, nil
106+
}
107+
108+
// enrichEvents adds duration information and phase name to each progress event.
109+
// Expects that the events are already sorted by timestamp in ascending order.
110+
// For the last event, duration is calculated using endTime.
111+
func enrichEvents(events []pipelines.PipelineEvent, endTime string) ([]ProgressEventWithDuration, error) {
112+
var progressEventsWithDuration []ProgressEventWithDuration
113+
for j := range events {
114+
var nextEvent pipelines.PipelineEvent
115+
event := events[j]
116+
if j == len(events)-1 {
117+
nextEvent = pipelines.PipelineEvent{Timestamp: endTime}
118+
} else {
119+
nextEvent = events[j+1]
120+
}
121+
timeDifference, err := eventTimeDifference(event, nextEvent)
122+
if err != nil {
123+
return nil, err
124+
}
125+
readableDuration, err := readableDuration(timeDifference)
126+
if err != nil {
127+
return nil, err
128+
}
129+
phase, err := phaseFromUpdateProgress(event.Message)
130+
if err != nil {
131+
return nil, err
132+
}
133+
progressEventsWithDuration = append(progressEventsWithDuration, ProgressEventWithDuration{
134+
Event: event,
135+
Duration: readableDuration,
136+
Phase: phase,
137+
})
138+
}
139+
140+
return progressEventsWithDuration, nil
141+
}
142+
143+
// displayProgressEventsDurations displays the progress events with duration and phase name.
144+
// Omits displaying the time of the last event.
145+
func displayProgressEventsDurations(ctx context.Context, events []pipelines.PipelineEvent) error {
146+
if len(events) <= 1 {
147+
return nil
148+
}
149+
progressEvents, err := enrichEvents(events[:len(events)-1], getLastEventTime(events))
150+
if err != nil {
151+
return fmt.Errorf("failed to enrich progress events: %w", err)
152+
}
40153

41-
pipelineResource := ref.Resource.(*resources.Pipeline)
42-
pipelineID := pipelineResource.ID
43-
if pipelineID == "" {
44-
return errors.New("unable to get pipeline ID from pipeline")
154+
data := ProgressEventsData{
155+
ProgressEvents: progressEvents,
156+
}
157+
158+
return cmdio.RenderWithTemplate(ctx, data, "", progressEventsTemplate)
159+
}
160+
161+
// fetchAndDisplayPipelineUpdate fetches the update and the update's associated update_progress events' durations.
162+
func fetchAndDisplayPipelineUpdate(ctx context.Context, w *databricks.WorkspaceClient, pipelineId, updateId string) error {
163+
if pipelineId == "" {
164+
return errors.New("no pipeline ID provided")
165+
}
166+
if updateId == "" {
167+
return errors.New("no update ID provided")
45168
}
46169

47170
getUpdateResponse, err := w.Pipelines.GetUpdate(ctx, pipelines.GetUpdateRequest{
48-
PipelineId: pipelineID,
171+
PipelineId: pipelineId,
49172
UpdateId: updateId,
50173
})
51174
if err != nil {
52175
return err
53176
}
54177

55178
if getUpdateResponse.Update == nil {
56-
return err
179+
return fmt.Errorf("no update found with id %s for pipeline %s", updateId, pipelineId)
57180
}
58181

59182
latestUpdate := *getUpdateResponse.Update
@@ -63,16 +186,19 @@ func fetchAndDisplayPipelineUpdate(ctx context.Context, bundle *bundle.Bundle, r
63186
OrderBy: "timestamp asc",
64187
}
65188

66-
events, err := fetchAllPipelineEvents(ctx, w, pipelineID, params)
189+
events, err := fetchAllPipelineEvents(ctx, w, pipelineId, params)
67190
if err != nil {
68191
return err
69192
}
70193

71-
if latestUpdate.State == pipelines.UpdateInfoStateCompleted {
72-
err = displayPipelineUpdate(ctx, latestUpdate, pipelineID, events)
73-
if err != nil {
74-
return err
75-
}
194+
err = displayPipelineUpdate(ctx, latestUpdate, pipelineId, events)
195+
if err != nil {
196+
return err
197+
}
198+
199+
err = displayProgressEventsDurations(ctx, events)
200+
if err != nil {
201+
return err
76202
}
77203

78204
return nil
@@ -89,12 +215,12 @@ func getLastEventTime(events []pipelines.PipelineEvent) string {
89215
if err != nil {
90216
return ""
91217
}
92-
return parsedTime.Format("2006-01-02T15:04:05Z")
218+
return parsedTime.Format(time.RFC3339Nano)
93219
}
94220

95-
func displayPipelineUpdate(ctx context.Context, update pipelines.UpdateInfo, pipelineID string, events []pipelines.PipelineEvent) error {
221+
func displayPipelineUpdate(ctx context.Context, update pipelines.UpdateInfo, pipelineId string, events []pipelines.PipelineEvent) error {
96222
data := PipelineUpdateData{
97-
PipelineId: pipelineID,
223+
PipelineId: pipelineId,
98224
Update: update,
99225
LastEventTime: getLastEventTime(events),
100226
}
@@ -215,15 +341,19 @@ Refreshes all tables in the pipeline unless otherwise specified.`,
215341
return fmt.Errorf("unknown output type %s", root.OutputType(cmd))
216342
}
217343
}
344+
218345
ref, err := bundleresources.Lookup(b, key, run.IsRunnable)
219346
if err != nil {
220347
return err
221348
}
349+
// Only displays the following pipeline run summary if the pipeline completes successfully,
350+
// as runner.Run() returns an error if the pipeline doesn't complete successfully.
222351
if ref.Description.SingularName == "pipeline" && runOutput != nil {
223352
if pipelineOutput, ok := runOutput.(*bundlerunoutput.PipelineOutput); ok && pipelineOutput.UpdateId != "" {
224-
err = fetchAndDisplayPipelineUpdate(ctx, b, ref, pipelineOutput.UpdateId)
353+
w := b.WorkspaceClient()
354+
err = fetchAndDisplayPipelineUpdate(ctx, w, ref.Resource.(*resources.Pipeline).ID, pipelineOutput.UpdateId)
225355
if err != nil {
226-
return err
356+
return fmt.Errorf("failed to fetch and display pipeline update: %w", err)
227357
}
228358
}
229359
}

0 commit comments

Comments
 (0)