33package pipelines
44
55import (
6+ "context"
67 "encoding/json"
8+ "errors"
79 "fmt"
10+ "time"
811
912 "github.com/databricks/cli/bundle"
13+ "github.com/databricks/cli/bundle/config/resources"
1014 "github.com/databricks/cli/bundle/deploy/terraform"
1115 "github.com/databricks/cli/bundle/phases"
12- "github.com/databricks/cli/bundle/resources"
16+ bundleresources "github.com/databricks/cli/bundle/resources"
1317 "github.com/databricks/cli/bundle/run"
14- "github.com/databricks/cli/bundle/run/output"
18+ bundlerunoutput "github.com/databricks/cli/bundle/run/output"
1519 "github.com/databricks/cli/bundle/statemgmt"
1620 "github.com/databricks/cli/cmd/bundle/utils"
1721 "github.com/databricks/cli/cmd/root"
1822 "github.com/databricks/cli/libs/cmdgroup"
23+ "github.com/databricks/cli/libs/cmdio"
1924 "github.com/databricks/cli/libs/flags"
2025 "github.com/databricks/cli/libs/logdiag"
26+ "github.com/databricks/databricks-sdk-go/service/pipelines"
2127 "github.com/spf13/cobra"
2228 "golang.org/x/exp/maps"
2329)
2430
31+ type PipelineUpdateData struct {
32+ PipelineId string
33+ Update pipelines.UpdateInfo
34+ LastEventTime string
35+ }
36+
37+ // fetchAndDisplayPipelineUpdate fetches the latest update for a pipeline and displays information about it.
38+ func fetchAndDisplayPipelineUpdate (ctx context.Context , bundle * bundle.Bundle , ref bundleresources.Reference , updateId string ) error {
39+ w := bundle .WorkspaceClient ()
40+
41+ pipelineResource := ref .Resource .(* resources.Pipeline )
42+ pipelineID := pipelineResource .ID
43+ if pipelineID == "" {
44+ return errors .New ("unable to get pipeline ID from pipeline" )
45+ }
46+
47+ getUpdateResponse , err := w .Pipelines .GetUpdate (ctx , pipelines.GetUpdateRequest {
48+ PipelineId : pipelineID ,
49+ UpdateId : updateId ,
50+ })
51+ if err != nil {
52+ return err
53+ }
54+
55+ if getUpdateResponse .Update == nil {
56+ return err
57+ }
58+
59+ latestUpdate := * getUpdateResponse .Update
60+
61+ params := & PipelineEventsQueryParams {
62+ Filter : fmt .Sprintf ("update_id='%s' AND event_type='update_progress'" , updateId ),
63+ OrderBy : "timestamp asc" ,
64+ }
65+
66+ events , err := fetchAllPipelineEvents (ctx , w , pipelineID , params )
67+ if err != nil {
68+ return err
69+ }
70+
71+ if latestUpdate .State == pipelines .UpdateInfoStateCompleted {
72+ err = displayPipelineUpdate (ctx , latestUpdate , pipelineID , events )
73+ if err != nil {
74+ return err
75+ }
76+ }
77+
78+ return nil
79+ }
80+
81+ // getLastEventTime returns the timestamp of the last progress event.
82+ // Expects that the events are already sorted by timestamp in ascending order.
83+ func getLastEventTime (events []pipelines.PipelineEvent ) string {
84+ if len (events ) == 0 {
85+ return ""
86+ }
87+ lastEvent := events [len (events )- 1 ]
88+ parsedTime , err := time .Parse (time .RFC3339Nano , lastEvent .Timestamp )
89+ if err != nil {
90+ return ""
91+ }
92+ return parsedTime .Format ("2006-01-02T15:04:05Z" )
93+ }
94+
95+ func displayPipelineUpdate (ctx context.Context , update pipelines.UpdateInfo , pipelineID string , events []pipelines.PipelineEvent ) error {
96+ data := PipelineUpdateData {
97+ PipelineId : pipelineID ,
98+ Update : update ,
99+ LastEventTime : getLastEventTime (events ),
100+ }
101+
102+ return cmdio .RenderWithTemplate (ctx , data , "" , pipelineUpdateTemplate )
103+ }
104+
25105func runCommand () * cobra.Command {
26106 cmd := & cobra.Command {
27107 Use : "run [flags] [KEY]" ,
@@ -100,20 +180,20 @@ Refreshes all tables in the pipeline unless otherwise specified.`,
100180 NoWait : noWait ,
101181 }
102182
103- var output output .RunOutput
183+ var runOutput bundlerunoutput .RunOutput
104184 if restart {
105- output , err = runner .Restart (ctx , & runOptions )
185+ runOutput , err = runner .Restart (ctx , & runOptions )
106186 } else {
107- output , err = runner .Run (ctx , & runOptions )
187+ runOutput , err = runner .Run (ctx , & runOptions )
108188 }
109189 if err != nil {
110190 return err
111191 }
112192
113- if output != nil {
193+ if runOutput != nil {
114194 switch root .OutputType (cmd ) {
115195 case flags .OutputText :
116- resultString , err := output .String ()
196+ resultString , err := runOutput .String ()
117197 if err != nil {
118198 return err
119199 }
@@ -122,7 +202,7 @@ Refreshes all tables in the pipeline unless otherwise specified.`,
122202 return err
123203 }
124204 case flags .OutputJSON :
125- b , err := json .MarshalIndent (output , "" , " " )
205+ b , err := json .MarshalIndent (runOutput , "" , " " )
126206 if err != nil {
127207 return err
128208 }
@@ -135,6 +215,18 @@ Refreshes all tables in the pipeline unless otherwise specified.`,
135215 return fmt .Errorf ("unknown output type %s" , root .OutputType (cmd ))
136216 }
137217 }
218+ ref , err := bundleresources .Lookup (b , key , run .IsRunnable )
219+ if err != nil {
220+ return err
221+ }
222+ if ref .Description .SingularName == "pipeline" && runOutput != nil {
223+ if pipelineOutput , ok := runOutput .(* bundlerunoutput.PipelineOutput ); ok && pipelineOutput .UpdateId != "" {
224+ err = fetchAndDisplayPipelineUpdate (ctx , b , ref , pipelineOutput .UpdateId )
225+ if err != nil {
226+ return err
227+ }
228+ }
229+ }
138230 return nil
139231 }
140232
@@ -151,7 +243,7 @@ Refreshes all tables in the pipeline unless otherwise specified.`,
151243 }
152244
153245 if len (args ) == 0 {
154- completions := resources .Completions (b , run .IsRunnable )
246+ completions := bundleresources .Completions (b , run .IsRunnable )
155247 return maps .Keys (completions ), cobra .ShellCompDirectiveNoFileComp
156248 } else {
157249 // If we know the resource to run, we can complete additional positional arguments.
0 commit comments