Skip to content

Commit 172a48c

Browse files
committed
endpoint
1 parent a44303f commit 172a48c

File tree

2 files changed

+64
-32
lines changed

2 files changed

+64
-32
lines changed

cmd/pipelines/run.go

Lines changed: 11 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ import (
2727
"github.com/databricks/databricks-sdk-go/service/pipelines"
2828
"github.com/spf13/cobra"
2929
"golang.org/x/exp/maps"
30-
"golang.org/x/exp/slices"
3130
)
3231

3332
type PipelineUpdateData struct {
@@ -37,7 +36,7 @@ type PipelineUpdateData struct {
3736
LastEventTime string
3837
}
3938

40-
const pipelineUpdateTemplate = `Update {{ .Update.UpdateId }} for pipeline {{- if .Update.Config }}{{ .Update.Config.Name }}{{ end }} {{- if .Update.Config }}{{ .Update.Config.Id }}{{ end }} completed successfully.
39+
const pipelineUpdateTemplate = `Update {{ .Update.UpdateId }} for pipeline {{- if .Update.Config }} {{ .Update.Config.Name }}{{ end }} {{- if .Update.Config }} {{ .Update.Config.Id }}{{ end }} completed successfully.
4140
{{- if .Update.Cause }}
4241
Cause: {{ .Update.Cause }}
4342
{{- end }}
@@ -90,30 +89,6 @@ func getRefreshSelectionString(update pipelines.UpdateInfo) string {
9089
return "default refresh-all"
9190
}
9291

93-
func fetchUpdateProgressEventsForUpdateAscending(ctx context.Context, bundle *bundle.Bundle, pipelineId, updateId string) ([]pipelines.PipelineEvent, error) {
94-
w := bundle.WorkspaceClient()
95-
96-
req := pipelines.ListPipelineEventsRequest{
97-
PipelineId: pipelineId,
98-
Filter: fmt.Sprintf("update_id='%s' AND event_type='update_progress'", updateId),
99-
// OrderBy: []string{"timestamp asc"}, TODO: Add this back in when the API is fixed
100-
}
101-
102-
iterator := w.Pipelines.ListPipelineEvents(ctx, req)
103-
var events []pipelines.PipelineEvent
104-
105-
for iterator.HasNext(ctx) {
106-
event, err := iterator.Next(ctx)
107-
if err != nil {
108-
return nil, err
109-
}
110-
events = append(events, event)
111-
}
112-
slices.Reverse(events)
113-
114-
return events, nil
115-
}
116-
11792
func fetchAndDisplayPipelineUpdate(ctx context.Context, bundle *bundle.Bundle, ref bundleresources.Reference, updateId string) error {
11893
w := bundle.WorkspaceClient()
11994

@@ -137,12 +112,17 @@ func fetchAndDisplayPipelineUpdate(ctx context.Context, bundle *bundle.Bundle, r
137112

138113
latestUpdate := *getUpdateResponse.Update
139114

140-
if latestUpdate.State == pipelines.UpdateInfoStateCompleted {
141-
events, err := fetchUpdateProgressEventsForUpdateAscending(ctx, bundle, pipelineID, updateId)
142-
if err != nil {
143-
return err
144-
}
115+
params := &PipelineEventsQueryParams{
116+
Filter: fmt.Sprintf("update_id='%s' AND event_type='update_progress'", updateId),
117+
OrderBy: "timestamp asc",
118+
}
145119

120+
events, err := fetchAllPipelineEvents(ctx, w, pipelineID, params)
121+
if err != nil {
122+
return err
123+
}
124+
125+
if latestUpdate.State == pipelines.UpdateInfoStateCompleted {
146126
err = displayPipelineUpdate(ctx, latestUpdate, pipelineID, events)
147127
if err != nil {
148128
return err
@@ -165,7 +145,6 @@ func getLastEventTime(events []pipelines.PipelineEvent) string {
165145
return parsedTime.Format("2006-01-02T15:04:05Z")
166146
}
167147

168-
// displayPipelineUpdate displays pipeline update information
169148
func displayPipelineUpdate(ctx context.Context, update pipelines.UpdateInfo, pipelineID string, events []pipelines.PipelineEvent) error {
170149
data := PipelineUpdateData{
171150
PipelineId: pipelineID,

cmd/pipelines/utils.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,16 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"strconv"
78

89
"github.com/databricks/cli/bundle"
910
configresources "github.com/databricks/cli/bundle/config/resources"
1011
"github.com/databricks/cli/bundle/resources"
1112
"github.com/databricks/cli/bundle/run"
1213
"github.com/databricks/cli/libs/cmdio"
14+
"github.com/databricks/databricks-sdk-go"
15+
"github.com/databricks/databricks-sdk-go/client"
16+
"github.com/databricks/databricks-sdk-go/service/pipelines"
1317
)
1418

1519
// Copied from cmd/bundle/run.go
@@ -88,3 +92,52 @@ func keyToRunner(b *bundle.Bundle, arg string) (run.Runner, error) {
8892

8993
return runner, nil
9094
}
95+
96+
type PipelineEventsResponse struct {
97+
Events []pipelines.PipelineEvent `json:"events"`
98+
NextPageToken string `json:"next_page_token,omitempty"`
99+
}
100+
101+
type PipelineEventsQueryParams struct {
102+
Filter string `json:"filter,omitempty"`
103+
MaxResults int `json:"max_results,omitempty"`
104+
PageToken string `json:"page_token,omitempty"`
105+
OrderBy string `json:"order_by,omitempty"`
106+
}
107+
108+
func fetchAllPipelineEvents(ctx context.Context, w *databricks.WorkspaceClient, pipelineID string, params *PipelineEventsQueryParams) ([]pipelines.PipelineEvent, error) {
109+
apiClient, err := client.New(w.Config)
110+
if err != nil {
111+
return nil, fmt.Errorf("failed to create API client: %w", err)
112+
}
113+
114+
path := fmt.Sprintf("/api/2.0/pipelines/%s/events", pipelineID)
115+
116+
queryParams := map[string]string{}
117+
if params.Filter != "" {
118+
queryParams["filter"] = params.Filter
119+
}
120+
if params.MaxResults > 0 {
121+
queryParams["max_results"] = strconv.Itoa(params.MaxResults)
122+
}
123+
124+
if params.OrderBy != "" {
125+
queryParams["order_by"] = params.OrderBy
126+
}
127+
128+
var response PipelineEventsResponse
129+
err = apiClient.Do(
130+
ctx,
131+
"GET",
132+
path,
133+
nil,
134+
nil,
135+
queryParams,
136+
&response,
137+
)
138+
if err != nil {
139+
return nil, fmt.Errorf("failed to fetch pipeline events: %w", err)
140+
}
141+
142+
return response.Events, nil
143+
}

0 commit comments

Comments
 (0)