Skip to content

Commit 76b77bd

Browse files
committed
removed templates
1 parent a213bd6 commit 76b77bd

File tree

1 file changed

+156
-2
lines changed

1 file changed

+156
-2
lines changed

cmd/pipelines/run.go

Lines changed: 156 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,26 +3,180 @@
33
package pipelines
44

55
import (
6+
"context"
67
"encoding/json"
8+
"errors"
79
"fmt"
10+
"strings"
11+
"time"
812

913
"github.com/databricks/cli/bundle"
14+
"github.com/databricks/cli/bundle/config/resources"
1015
"github.com/databricks/cli/bundle/deploy/terraform"
1116
"github.com/databricks/cli/bundle/phases"
1217
bundleresources "github.com/databricks/cli/bundle/resources"
1318
"github.com/databricks/cli/bundle/run"
1419
bundlerunoutput "github.com/databricks/cli/bundle/run/output"
1520
"github.com/databricks/cli/bundle/statemgmt"
1621
"github.com/databricks/cli/cmd/bundle/utils"
17-
pipelinetemplates "github.com/databricks/cli/cmd/pipelines/templates"
1822
"github.com/databricks/cli/cmd/root"
1923
"github.com/databricks/cli/libs/cmdgroup"
24+
"github.com/databricks/cli/libs/cmdio"
2025
"github.com/databricks/cli/libs/flags"
2126
"github.com/databricks/cli/libs/logdiag"
27+
"github.com/databricks/databricks-sdk-go/service/pipelines"
2228
"github.com/spf13/cobra"
2329
"golang.org/x/exp/maps"
30+
"golang.org/x/exp/slices"
2431
)
2532

33+
type PipelineUpdateData struct {
34+
PipelineId string
35+
Update pipelines.UpdateInfo
36+
RefreshSelectionStr string
37+
LastEventTime string
38+
}
39+
40+
const pipelineUpdateTemplate = `Update {{ .Update.UpdateId }} for pipeline {{- if .Update.Config }}{{ .Update.Config.Name }}{{ end }} {{- if .Update.Config }}{{ .Update.Config.Id }}{{ end }} completed successfully.
41+
{{- if .Update.Cause }}
42+
Cause: {{ .Update.Cause }}
43+
{{- end }}
44+
{{- if .Update.CreationTime }}
45+
Creation Time: {{ .Update.CreationTime | pretty_UTC_date_from_millis }}
46+
{{- end }}
47+
{{- if .LastEventTime }}
48+
End Time: {{ .LastEventTime }}
49+
{{- end }}
50+
{{- if or (and .Update.Config .Update.Config.Serverless) .Update.ClusterId }}
51+
Compute: {{ if .Update.Config.Serverless }} serverless {{ else }}{{ .Update.ClusterId }}{{ end }}
52+
{{- end }}
53+
Refresh: {{ .RefreshSelectionStr }}
54+
{{- if .Update.Config }}
55+
{{- if .Update.Config.Channel }}
56+
Channel: {{ .Update.Config.Channel }}
57+
{{- end }}
58+
{{- if .Update.Config.Continuous }}
59+
Continuous: {{ .Update.Config.Continuous }}
60+
{{- end }}
61+
{{- if .Update.Config.Development }}
62+
Development mode: {{ if .Update.Config.Development }}Dev{{ else }}Prod{{ end }}
63+
{{- end }}
64+
{{- if .Update.Config.Environment }}
65+
Environment: {{ .Update.Config.Environment }}
66+
{{- end }}
67+
{{- if or .Update.Config.Catalog .Update.Config.Schema }}
68+
Catalog & Schema: {{ .Update.Config.Catalog }}{{ if and .Update.Config.Catalog .Update.Config.Schema }}.{{ end }}{{ .Update.Config.Schema }}
69+
{{- end }}
70+
{{- end }}
71+
`
72+
73+
func getRefreshSelectionString(update pipelines.UpdateInfo) string {
74+
if update.FullRefresh {
75+
return "full-refresh-all"
76+
}
77+
78+
var parts []string
79+
if len(update.RefreshSelection) > 0 {
80+
parts = append(parts, fmt.Sprintf("refreshed [%s]", strings.Join(update.RefreshSelection, ", ")))
81+
}
82+
if len(update.FullRefreshSelection) > 0 {
83+
parts = append(parts, fmt.Sprintf("full-refreshed [%s]", strings.Join(update.FullRefreshSelection, ", ")))
84+
}
85+
86+
if len(parts) > 0 {
87+
return strings.Join(parts, " | ")
88+
}
89+
90+
return "default refresh-all"
91+
}
92+
93+
func fetchUpdateProgressEventsForUpdateAscending(ctx context.Context, bundle *bundle.Bundle, pipelineId, updateId string) ([]pipelines.PipelineEvent, error) {
94+
w := bundle.WorkspaceClient()
95+
96+
req := pipelines.ListPipelineEventsRequest{
97+
PipelineId: pipelineId,
98+
Filter: fmt.Sprintf("update_id='%s' AND event_type='update_progress'", updateId),
99+
// OrderBy: []string{"timestamp asc"}, TODO: Add this back in when the API is fixed
100+
}
101+
102+
iterator := w.Pipelines.ListPipelineEvents(ctx, req)
103+
var events []pipelines.PipelineEvent
104+
105+
for iterator.HasNext(ctx) {
106+
event, err := iterator.Next(ctx)
107+
if err != nil {
108+
return nil, err
109+
}
110+
events = append(events, event)
111+
}
112+
slices.Reverse(events)
113+
114+
return events, nil
115+
}
116+
117+
func fetchAndDisplayPipelineUpdate(ctx context.Context, bundle *bundle.Bundle, ref bundleresources.Reference, updateId string) error {
118+
w := bundle.WorkspaceClient()
119+
120+
pipelineResource := ref.Resource.(*resources.Pipeline)
121+
pipelineID := pipelineResource.ID
122+
if pipelineID == "" {
123+
return errors.New("unable to get pipeline ID from pipeline")
124+
}
125+
126+
getUpdateResponse, err := w.Pipelines.GetUpdate(ctx, pipelines.GetUpdateRequest{
127+
PipelineId: pipelineID,
128+
UpdateId: updateId,
129+
})
130+
if err != nil {
131+
return err
132+
}
133+
134+
if getUpdateResponse.Update == nil {
135+
return err
136+
}
137+
138+
latestUpdate := *getUpdateResponse.Update
139+
140+
if latestUpdate.State == pipelines.UpdateInfoStateCompleted {
141+
events, err := fetchUpdateProgressEventsForUpdateAscending(ctx, bundle, pipelineID, updateId)
142+
if err != nil {
143+
return err
144+
}
145+
146+
err = displayPipelineUpdate(ctx, latestUpdate, pipelineID, events)
147+
if err != nil {
148+
return err
149+
}
150+
}
151+
152+
return nil
153+
}
154+
155+
// getLastEventTime returns the timestamp of the last progress event
156+
func getLastEventTime(events []pipelines.PipelineEvent) string {
157+
if len(events) == 0 {
158+
return ""
159+
}
160+
lastEvent := events[len(events)-1]
161+
parsedTime, err := time.Parse(time.RFC3339Nano, lastEvent.Timestamp)
162+
if err != nil {
163+
return ""
164+
}
165+
return parsedTime.Format("2006-01-02T15:04:05Z")
166+
}
167+
168+
// displayPipelineUpdate displays pipeline update information
169+
func displayPipelineUpdate(ctx context.Context, update pipelines.UpdateInfo, pipelineID string, events []pipelines.PipelineEvent) error {
170+
data := PipelineUpdateData{
171+
PipelineId: pipelineID,
172+
Update: update,
173+
RefreshSelectionStr: getRefreshSelectionString(update),
174+
LastEventTime: getLastEventTime(events),
175+
}
176+
177+
return cmdio.RenderWithTemplate(ctx, data, "", pipelineUpdateTemplate)
178+
}
179+
26180
func runCommand() *cobra.Command {
27181
cmd := &cobra.Command{
28182
Use: "run [flags] [KEY]",
@@ -142,7 +296,7 @@ Refreshes all tables in the pipeline unless otherwise specified.`,
142296
}
143297
if ref.Description.SingularName == "pipeline" && runOutput != nil {
144298
if pipelineOutput, ok := runOutput.(*bundlerunoutput.PipelineOutput); ok && pipelineOutput.UpdateId != "" {
145-
err = pipelinetemplates.FetchAndDisplayPipelineUpdate(ctx, b, ref, pipelineOutput.UpdateId)
299+
err = fetchAndDisplayPipelineUpdate(ctx, b, ref, pipelineOutput.UpdateId)
146300
if err != nil {
147301
return err
148302
}

0 commit comments

Comments
 (0)