Skip to content

Commit 8b00572

Browse files
authored
[Exporter] Improve exporting of databricks_pipeline resources (#4142)
## Changes <!-- Summary of your changes that are easy to understand --> Changes include: - Use `List` + iterator instead of waiting for full list - improves performance in big workspaces with a lot of DLT pipelines - Better handling of pipelines deployed via DABs - fix error that lead to emitting of notebooks even for DLT pipelines deployed with DABs. - Emit `databricks_schema` for pipelines with direct publishing mode enabled. ## Tests <!-- How is this tested? Please see the checklist below and also describe any other relevant tests --> - [x] `make test` run locally - [ ] relevant change in `docs/` folder - [ ] covered with integration tests in `internal/acceptance` - [ ] relevant acceptance tests are passing - [ ] using Go SDK
1 parent 83984a6 commit 8b00572

File tree

3 files changed

+38
-23
lines changed

3 files changed

+38
-23
lines changed

exporter/exporter_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,7 @@ var meAdminFixture = qa.HTTPFixture{
252252
var emptyPipelines = qa.HTTPFixture{
253253
Method: "GET",
254254
ReuseRequest: true,
255-
Resource: "/api/2.0/pipelines?max_results=50",
255+
Resource: "/api/2.0/pipelines?max_results=100",
256256
Response: pipelines.ListPipelinesResponse{},
257257
}
258258

@@ -2021,7 +2021,7 @@ func TestImportingDLTPipelines(t *testing.T) {
20212021
emptyIpAccessLIst,
20222022
{
20232023
Method: "GET",
2024-
Resource: "/api/2.0/pipelines?max_results=50",
2024+
Resource: "/api/2.0/pipelines?max_results=100",
20252025
Response: pipelines.ListPipelinesResponse{
20262026
Statuses: []pipelines.PipelineStateInfo{
20272027
{
@@ -2236,7 +2236,7 @@ func TestImportingDLTPipelinesMatchingOnly(t *testing.T) {
22362236
userReadFixture,
22372237
{
22382238
Method: "GET",
2239-
Resource: "/api/2.0/pipelines?max_results=50",
2239+
Resource: "/api/2.0/pipelines?max_results=100",
22402240
Response: pipelines.ListPipelinesResponse{
22412241
Statuses: []pipelines.PipelineStateInfo{
22422242
{
@@ -2601,7 +2601,7 @@ func TestIncrementalDLTAndMLflowWebhooks(t *testing.T) {
26012601
},
26022602
{
26032603
Method: "GET",
2604-
Resource: "/api/2.0/pipelines?max_results=50",
2604+
Resource: "/api/2.0/pipelines?max_results=100",
26052605
Response: pipelines.ListPipelinesResponse{
26062606
Statuses: []pipelines.PipelineStateInfo{
26072607
{

exporter/importables.go

Lines changed: 33 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2002,23 +2002,22 @@ var resourcesMap map[string]importable = map[string]importable{
20022002
return name + "_" + d.Id()
20032003
},
20042004
List: func(ic *importContext) error {
2005-
w, err := ic.Client.WorkspaceClient()
2006-
if err != nil {
2007-
return err
2008-
}
2009-
pipelinesList, err := w.Pipelines.ListPipelinesAll(ic.Context, pipelines.ListPipelinesRequest{
2010-
MaxResults: 50,
2005+
it := ic.workspaceClient.Pipelines.ListPipelines(ic.Context, pipelines.ListPipelinesRequest{
2006+
MaxResults: 100,
20112007
})
2012-
if err != nil {
2013-
return err
2014-
}
2015-
for i, q := range pipelinesList {
2008+
i := 0
2009+
for it.HasNext(ic.Context) {
2010+
q, err := it.Next(ic.Context)
2011+
if err != nil {
2012+
return err
2013+
}
2014+
i++
20162015
if !ic.MatchesName(q.Name) {
20172016
continue
20182017
}
20192018
var modifiedAt int64
20202019
if ic.incremental {
2021-
pipeline, err := w.Pipelines.Get(ic.Context, pipelines.GetPipelineRequest{
2020+
pipeline, err := ic.workspaceClient.Pipelines.Get(ic.Context, pipelines.GetPipelineRequest{
20222021
PipelineId: q.PipelineId,
20232022
})
20242023
if err != nil {
@@ -2030,21 +2029,37 @@ var resourcesMap map[string]importable = map[string]importable{
20302029
Resource: "databricks_pipeline",
20312030
ID: q.PipelineId,
20322031
}, modifiedAt, fmt.Sprintf("DLT Pipeline '%s'", q.Name))
2033-
log.Printf("[INFO] Imported %d of %d DLT Pipelines", i+1, len(pipelinesList))
2032+
if i%100 == 0 {
2033+
log.Printf("[INFO] Imported %d DLT Pipelines", i)
2034+
}
20342035
}
2036+
log.Printf("[INFO] Listed %d DLT pipelines", i)
20352037
return nil
20362038
},
20372039
Import: func(ic *importContext, r *resource) error {
20382040
var pipeline tfpipelines.Pipeline
20392041
s := ic.Resources["databricks_pipeline"].Schema
20402042
common.DataToStructPointer(r.Data, s, &pipeline)
2041-
if pipeline.Catalog != "" && pipeline.Target != "" {
2042-
ic.Emit(&resource{
2043-
Resource: "databricks_schema",
2044-
ID: pipeline.Catalog + "." + pipeline.Target,
2045-
})
2043+
if pipeline.Deployment != nil && pipeline.Deployment.Kind == "BUNDLE" {
2044+
log.Printf("[INFO] Skipping processing of DLT Pipeline with ID %s (%s) as deployed with DABs",
2045+
r.ID, pipeline.Name)
2046+
return nil
2047+
}
2048+
if pipeline.Catalog != "" {
2049+
var schemaName string
2050+
if pipeline.Target != "" {
2051+
schemaName = pipeline.Target
2052+
} else if pipeline.Schema != "" {
2053+
schemaName = pipeline.Schema
2054+
}
2055+
if schemaName != "" {
2056+
ic.Emit(&resource{
2057+
Resource: "databricks_schema",
2058+
ID: pipeline.Catalog + "." + pipeline.Target,
2059+
})
2060+
}
20462061
}
2047-
if pipeline.Deployment == nil || pipeline.Deployment.Kind == "BUNDLE" {
2062+
if pipeline.Deployment == nil || pipeline.Deployment.Kind != "BUNDLE" {
20482063
for _, lib := range pipeline.Libraries {
20492064
if lib.Notebook != nil {
20502065
ic.emitNotebookOrRepo(lib.Notebook.Path)

exporter/importables_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1369,7 +1369,7 @@ func TestIncrementalListDLT(t *testing.T) {
13691369
qa.HTTPFixturesApply(t, []qa.HTTPFixture{
13701370
{
13711371
Method: "GET",
1372-
Resource: "/api/2.0/pipelines?max_results=50",
1372+
Resource: "/api/2.0/pipelines?max_results=100",
13731373
Response: pipelines.ListPipelinesResponse{
13741374
Statuses: []pipelines.PipelineStateInfo{
13751375
{

0 commit comments

Comments
 (0)