Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions NEXT_CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

### New Features and Improvements

* Relaxed `force_new` constraint on `catalog` attribute in `databricks_pipeline` resource to allow changing the default catalog for existing pipelines ([#5180](https://github.com/databricks/terraform-provider-databricks/issues/5180)).
* Add `databricks_users` data source ([#4028](https://github.com/databricks/terraform-provider-databricks/pull/4028))
* Improve `databricks_service_principals` data source ([#5164](https://github.com/databricks/terraform-provider-databricks/pull/5164))

Expand Down
2 changes: 1 addition & 1 deletion docs/resources/pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ resource "databricks_pipeline" "this" {
The following arguments are supported:

* `name` - A user-friendly name for this pipeline. The name can be used to identify pipeline jobs in the UI.
* `catalog` - The name of catalog in Unity Catalog. *Change of this parameter forces recreation of the pipeline.* (Conflicts with `storage`).
* `catalog` - The name of default catalog in Unity Catalog. *Change of this parameter forces recreation of the pipeline if you switch from `storage` to `catalog` or vice versa. If pipeline was already created with `catalog` set, the value could be changed.* (Conflicts with `storage`).
* `schema` - (Optional, String, Conflicts with `target`) The default schema (database) where tables are read from or published to. The presence of this attribute implies that the pipeline is in direct publishing mode.
* `storage` - A location on cloud storage where output data and metadata required for pipeline execution are stored. By default, tables are stored in a subdirectory of this location. *Change of this parameter forces recreation of the pipeline.* (Conflicts with `catalog`).
* `target` - (Optional, String, Conflicts with `schema`) The name of a database (in either the Hive metastore or in a UC catalog) for persisting pipeline output data. Configuring the target setting allows you to view and query the pipeline output data from the Databricks UI.
Expand Down
20 changes: 19 additions & 1 deletion pipelines/resource_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,8 @@ func (Pipeline) CustomizeSchema(s *common.CustomizableSchema) *common.Customizab

// ForceNew fields
s.SchemaPath("storage").SetForceNew()
s.SchemaPath("catalog").SetForceNew()
// catalog can be updated in-place, but switching between storage and catalog requires recreation
// (handled in CustomizeDiff)
s.SchemaPath("gateway_definition", "connection_id").SetForceNew()
s.SchemaPath("gateway_definition", "gateway_storage_catalog").SetForceNew()
s.SchemaPath("gateway_definition", "gateway_storage_schema").SetForceNew()
Expand Down Expand Up @@ -335,5 +336,22 @@ func ResourcePipeline() common.Resource {
Timeouts: &schema.ResourceTimeout{
Default: schema.DefaultTimeout(DefaultTimeout),
},
CustomizeDiff: func(ctx context.Context, d *schema.ResourceDiff, c *common.DatabricksClient) error {
// Allow changing catalog value in existing pipelines, but force recreation
// when switching between storage and catalog (or vice versa).
// This should only run on update, thus we skip this check if the ID is not known.
if d.Id() == "" {
return nil
}

// If both storage and catalog changed, it means we're switching between storage and catalog modes
if d.HasChange("storage") && d.HasChange("catalog") {
if err := d.ForceNew("catalog"); err != nil {
return err
}
return d.ForceNew("storage")
}
return nil
},
}
}
171 changes: 171 additions & 0 deletions pipelines/resource_pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -806,3 +806,174 @@ func TestDefault(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, "abcd", d.Id())
}

func TestUpdatePipelineCatalogInPlace(t *testing.T) {
state := pipelines.PipelineStateRunning
spec := pipelines.PipelineSpec{
Id: "abcd",
Name: "test",
Catalog: "new_catalog",
Libraries: []pipelines.PipelineLibrary{
{
Notebook: &pipelines.NotebookLibrary{
Path: "/Test",
},
},
},
Filters: &pipelines.Filters{
Include: []string{"com.databricks.include"},
},
Channel: "CURRENT",
Edition: "ADVANCED",
}
qa.ResourceFixture{
MockWorkspaceClientFunc: func(w *mocks.MockWorkspaceClient) {
e := w.GetMockPipelinesAPI().EXPECT()
e.Update(mock.Anything, pipelines.EditPipeline{
Id: "abcd",
PipelineId: "abcd",
Name: "test",
Catalog: "new_catalog",
Libraries: []pipelines.PipelineLibrary{
{
Notebook: &pipelines.NotebookLibrary{
Path: "/Test",
},
},
},
Filters: &pipelines.Filters{
Include: []string{"com.databricks.include"},
},
Channel: "CURRENT",
Edition: "ADVANCED",
}).Return(nil)
e.Get(mock.Anything, pipelines.GetPipelineRequest{
PipelineId: "abcd",
}).Return(&pipelines.GetPipelineResponse{
PipelineId: "abcd",
Spec: &spec,
State: state,
}, nil).Twice()
},
Resource: ResourcePipeline(),
HCL: `name = "test"
catalog = "new_catalog"
library {
notebook {
path = "/Test"
}
}
filters {
include = [ "com.databricks.include" ]
}`,
InstanceState: map[string]string{
"name": "test",
"catalog": "old_catalog",
},
Update: true,
ID: "abcd",
}.ApplyAndExpectData(t, map[string]any{
"id": "abcd",
"catalog": "new_catalog",
})
}

func TestUpdatePipelineStorageToCatalogForceNew(t *testing.T) {
state := pipelines.PipelineStateRunning
spec := pipelines.PipelineSpec{
Id: "abcd",
Name: "test",
Storage: "/test/storage",
Libraries: []pipelines.PipelineLibrary{
{
Notebook: &pipelines.NotebookLibrary{
Path: "/Test",
},
},
},
Filters: &pipelines.Filters{
Include: []string{"com.databricks.include"},
},
Channel: "CURRENT",
Edition: "ADVANCED",
}
qa.ResourceFixture{
MockWorkspaceClientFunc: func(w *mocks.MockWorkspaceClient) {
e := w.GetMockPipelinesAPI().EXPECT()
e.Update(mock.Anything, mock.Anything).Return(nil)
e.Get(mock.Anything, pipelines.GetPipelineRequest{
PipelineId: "abcd",
}).Return(&pipelines.GetPipelineResponse{
PipelineId: "abcd",
Spec: &spec,
State: state,
}, nil)
},
RequiresNew: true,
Resource: ResourcePipeline(),
Update: true,
ID: "abcd",
InstanceState: map[string]string{
"name": "test",
"storage": "/test/storage",
},
HCL: `
name = "test"
catalog = "new_catalog"
library {
notebook {
path = "/Test"
}
}`,
}.ApplyNoError(t)
}

func TestUpdatePipelineCatalogToStorageForceNew(t *testing.T) {
state := pipelines.PipelineStateRunning
spec := pipelines.PipelineSpec{
Id: "abcd",
Name: "test",
Catalog: "old_catalog",
Libraries: []pipelines.PipelineLibrary{
{
Notebook: &pipelines.NotebookLibrary{
Path: "/Test",
},
},
},
Filters: &pipelines.Filters{
Include: []string{"com.databricks.include"},
},
Channel: "CURRENT",
Edition: "ADVANCED",
}
qa.ResourceFixture{
MockWorkspaceClientFunc: func(w *mocks.MockWorkspaceClient) {
e := w.GetMockPipelinesAPI().EXPECT()
e.Update(mock.Anything, mock.Anything).Return(nil)
e.Get(mock.Anything, pipelines.GetPipelineRequest{
PipelineId: "abcd",
}).Return(&pipelines.GetPipelineResponse{
PipelineId: "abcd",
Spec: &spec,
State: state,
}, nil)
},
RequiresNew: true,
Resource: ResourcePipeline(),
Update: true,
ID: "abcd",
InstanceState: map[string]string{
"name": "test",
"catalog": "old_catalog",
},
HCL: `
name = "test"
storage = "/test/storage"
library {
notebook {
path = "/Test"
}
}`,
}.ApplyNoError(t)
}
Loading