Skip to content

Commit c160636

Browse files
committed
refactored
1 parent ffdddb8 commit c160636

File tree

3 files changed

+54
-29
lines changed

3 files changed

+54
-29
lines changed

cmd/pipelines/run.go

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -105,13 +105,19 @@ func eventTimeDifference(earlierEvent, laterEvent pipelines.PipelineEvent) (time
105105
return timeDifference, nil
106106
}
107107

108-
// enrichEvents adds duration information and phase name to a progress event
108+
// enrichEvents adds duration information and phase name to each progress event.
109109
// Expects that the events are already sorted by timestamp in ascending order.
110-
func enrichEvents(events []pipelines.PipelineEvent) ([]ProgressEventWithDuration, error) {
110+
// For the last event, duration is calculated using endTime.
111+
func enrichEvents(events []pipelines.PipelineEvent, endTime string) ([]ProgressEventWithDuration, error) {
111112
var progressEventsWithDuration []ProgressEventWithDuration
112-
for j := range len(events) - 1 {
113+
for j := range len(events) {
114+
var nextEvent pipelines.PipelineEvent
113115
event := events[j]
114-
nextEvent := events[j+1]
116+
if j == len(events)-1 {
117+
nextEvent = pipelines.PipelineEvent{Timestamp: endTime}
118+
} else {
119+
nextEvent = events[j+1]
120+
}
115121
timeDifference, err := eventTimeDifference(event, nextEvent)
116122
if err != nil {
117123
return nil, err
@@ -130,8 +136,13 @@ func enrichEvents(events []pipelines.PipelineEvent) ([]ProgressEventWithDuration
130136
return progressEventsWithDuration, nil
131137
}
132138

133-
func displayProgressEvents(ctx context.Context, events []pipelines.PipelineEvent) error {
134-
progressEvents, err := enrichEvents(events)
139+
// displayProgressEventsDurations displays the progress events with duration and phase name.
140+
// Omits displaying the time of the last event.
141+
func displayProgressEventsDurations(ctx context.Context, events []pipelines.PipelineEvent) error {
142+
if len(events) <= 1 {
143+
return fmt.Errorf("no progress events to display")
144+
}
145+
progressEvents, err := enrichEvents(events[:len(events)-1], getLastEventTime(events))
135146
if err != nil {
136147
return fmt.Errorf("failed to enrich progress events: %w", err)
137148
}
@@ -176,14 +187,12 @@ func fetchAndDisplayPipelineUpdate(ctx context.Context, w *databricks.WorkspaceC
176187
return err
177188
}
178189

179-
if latestUpdate.State == pipelines.UpdateInfoStateCompleted {
180-
err = displayPipelineUpdate(ctx, latestUpdate, pipelineId, events)
181-
if err != nil {
182-
return err
183-
}
190+
err = displayPipelineUpdate(ctx, latestUpdate, pipelineId, events)
191+
if err != nil {
192+
return err
184193
}
185194

186-
err = displayProgressEvents(ctx, events)
195+
err = displayProgressEventsDurations(ctx, events)
187196
if err != nil {
188197
return err
189198
}
@@ -202,7 +211,7 @@ func getLastEventTime(events []pipelines.PipelineEvent) string {
202211
if err != nil {
203212
return ""
204213
}
205-
return parsedTime.Format("2006-01-02T15:04:05Z")
214+
return parsedTime.Format(time.RFC3339Nano)
206215
}
207216

208217
func displayPipelineUpdate(ctx context.Context, update pipelines.UpdateInfo, pipelineId string, events []pipelines.PipelineEvent) error {
@@ -328,10 +337,13 @@ Refreshes all tables in the pipeline unless otherwise specified.`,
328337
return fmt.Errorf("unknown output type %s", root.OutputType(cmd))
329338
}
330339
}
340+
331341
ref, err := bundleresources.Lookup(b, key, run.IsRunnable)
332342
if err != nil {
333343
return err
334344
}
345+
// Only displays the following pipeline run summary if the pipeline completes successfully,
346+
// as runner.Run() returns an error if the pipeline doesn't complete successfully.
335347
if ref.Description.SingularName == "pipeline" && runOutput != nil {
336348
if pipelineOutput, ok := runOutput.(*bundlerunoutput.PipelineOutput); ok && pipelineOutput.UpdateId != "" {
337349
w := b.WorkspaceClient()

cmd/pipelines/run_test.go

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,8 @@ func TestDisplayPipelineUpdate(t *testing.T) {
5050
EventType: "update_progress",
5151
},
5252
},
53-
expected: `Update for pipeline test-pipeline completed successfully.
53+
expected: `
54+
Update for pipeline test-pipeline completed successfully.
5455
Pipeline ID: pipeline-789
5556
Update start time: 2022-01-01T00:00:00Z
5657
Update end time: 2022-01-01T01:00:00Z.
@@ -72,7 +73,8 @@ Pipeline configurations for this update:
7273
},
7374
pipelineID: "pipeline-789",
7475
events: []pipelines.PipelineEvent{},
75-
expected: `Update for pipeline completed successfully.
76+
expected: `
77+
Update for pipeline completed successfully.
7678
Pipeline configurations for this update:
7779
• All tables are refreshed
7880
`,
@@ -91,7 +93,8 @@ Pipeline configurations for this update:
9193
EventType: "update_progress",
9294
},
9395
},
94-
expected: `Update for pipeline completed successfully.
96+
expected: `
97+
Update for pipeline completed successfully.
9598
Pipeline configurations for this update:
9699
• Refreshed [table1, table2]
97100
• Full refreshed [table3]
@@ -109,7 +112,8 @@ Pipeline configurations for this update:
109112
},
110113
pipelineID: "pipeline-456",
111114
events: []pipelines.PipelineEvent{},
112-
expected: `Update for pipeline test-pipeline completed successfully.
115+
expected: `
116+
Update for pipeline test-pipeline completed successfully.
113117
Pipeline ID: pipeline-789
114118
Pipeline configurations for this update:
115119
• All tables are refreshed
@@ -124,7 +128,8 @@ Pipeline configurations for this update:
124128
},
125129
pipelineID: "pipeline-456",
126130
events: []pipelines.PipelineEvent{},
127-
expected: `Update for pipeline completed successfully.
131+
expected: `
132+
Update for pipeline completed successfully.
128133
Pipeline configurations for this update:
129134
• All tables are refreshed
130135
• Classic compute: cluster-123
@@ -267,7 +272,7 @@ func TestReadableDuration(t *testing.T) {
267272
}
268273
}
269274

270-
func TestDisplayProgressEvents(t *testing.T) {
275+
func TestDisplayProgressEventsDurations(t *testing.T) {
271276
tests := []struct {
272277
name string
273278
events []pipelines.PipelineEvent
@@ -342,21 +347,24 @@ WAITING_FOR_RESOURCES 500ms
342347
RUNNING 750ms
343348
`,
344349
},
350+
351+
{
352+
name: "edge cases - empty event",
353+
events: []pipelines.PipelineEvent{},
354+
expected: "",
355+
wantErr: true,
356+
},
345357
{
346358
name: "edge cases - single event",
347359
events: []pipelines.PipelineEvent{
348360
{
349361
Timestamp: "2022-01-01T00:00:00Z",
350362
EventType: "update_progress",
351-
Message: "Update test-update-single is COMPLETED.",
363+
Message: "Update test-update-single is RUNNING.",
352364
},
353365
},
354366
expected: "",
355-
},
356-
{
357-
name: "edge cases - empty event",
358-
events: []pipelines.PipelineEvent{},
359-
expected: "",
367+
wantErr: true,
360368
},
361369
}
362370

@@ -368,9 +376,13 @@ RUNNING 750ms
368376
cmdIO := cmdio.NewIO(ctx, flags.OutputText, nil, &buf, &buf, "", "")
369377
ctx = cmdio.InContext(ctx, cmdIO)
370378

371-
err := displayProgressEvents(ctx, tt.events)
372-
assert.NoError(t, err)
373-
assert.Equal(t, tt.expected, buf.String())
379+
err := displayProgressEventsDurations(ctx, tt.events)
380+
if tt.wantErr {
381+
assert.Error(t, err)
382+
} else {
383+
assert.NoError(t, err)
384+
assert.Equal(t, tt.expected, buf.String())
385+
}
374386
})
375387
}
376388
}

cmd/pipelines/templates.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package pipelines
22

3-
const pipelineUpdateTemplate = `Update for pipeline {{- if .Update.Config }} {{ .Update.Config.Name }}{{ end }} completed successfully.
3+
const pipelineUpdateTemplate = `
4+
Update for pipeline {{- if .Update.Config }} {{ .Update.Config.Name }}{{ end }} completed successfully.
45
{{- if .Update.Config }}
56
Pipeline ID: {{ .Update.Config.Id }}
67
{{- end }}

0 commit comments

Comments
 (0)