Skip to content

Commit 7bca37b

Browse files
authored
Added databricks_permissions for databricks_pipeline (#1361)
This fixes #1359
1 parent d15fecb commit 7bca37b

File tree

5 files changed

+164
-31
lines changed

5 files changed

+164
-31
lines changed

docs/resources/permissions.md

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,72 @@ resource "databricks_permissions" "job_usage" {
211211
}
212212
```
213213

214+
## Delta Live Tables usage
215+
216+
There are four assignable [permission levels](https://docs.databricks.com/security/access-control/dlt-acl.html#delta-live-tables-permissions) for [databricks_pipeline](pipeline.md): `CAN_VIEW`, `CAN_RUN`, `CAN_MANAGE`, and `IS_OWNER`. Admins are granted the `CAN_MANAGE` permission by default, and they can assign that permission to non-admin users, and service principals.
217+
218+
- The creator of a DLT Pipeline has `IS_OWNER` permission. Destroying `databricks_permissions` resource for a pipeline would revert ownership to the creator.
219+
- A DLT pipeline must have exactly one owner. If a resource is changed and no owner is specified, the currently authenticated principal would become the new owner of the pipeline. Nothing would change, per se, if the pipeline was created through Terraform.
220+
- A DLT pipeline cannot have a group as an owner.
221+
- DLT Pipelines triggered through _Start_ assume the permissions of the pipeline owner and not the user, and service principal who issued Run Now.
222+
- Read [main documentation](https://docs.databricks.com/security/access-control/dlt-acl.html) for additional detail.
223+
224+
```hcl
225+
resource "databricks_group" "eng" {
226+
display_name = "Engineering"
227+
}
228+
229+
resource "databricks_notebook" "dlt_demo" {
230+
content_base64 = base64encode(<<-EOT
231+
import dlt
232+
json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
233+
@dlt.table(
234+
comment="The raw wikipedia clickstream dataset, ingested from /databricks-datasets."
235+
)
236+
def clickstream_raw():
237+
return (spark.read.format("json").load(json_path))
238+
EOT
239+
)
240+
language = "PYTHON"
241+
path = "${data.databricks_current_user.me.home}/DLT_Demo"
242+
}
243+
244+
resource "databricks_pipeline" "this" {
245+
name = "DLT Demo Pipeline (${data.databricks_current_user.me.alphanumeric})"
246+
storage = "/test/tf-pipeline"
247+
configuration = {
248+
key1 = "value1"
249+
key2 = "value2"
250+
}
251+
252+
library {
253+
notebook {
254+
path = databricks_notebook.dlt_demo.id
255+
}
256+
}
257+
258+
continuous = false
259+
filters {
260+
include = ["com.databricks.include"]
261+
exclude = ["com.databricks.exclude"]
262+
}
263+
}
264+
265+
resource "databricks_permissions" "dlt_usage" {
266+
pipeline_id = databricks_pipeline.this.id
267+
268+
access_control {
269+
group_name = "users"
270+
permission_level = "CAN_VIEW"
271+
}
272+
273+
access_control {
274+
group_name = databricks_group.eng.display_name
275+
permission_level = "CAN_MANAGE"
276+
}
277+
}
278+
```
279+
214280
## Notebook usage
215281

216282
Valid [permission levels](https://docs.databricks.com/security/access-control/workspace-acl.html#notebook-permissions) for [databricks_notebook](notebook.md) are: `CAN_READ`, `CAN_RUN`, `CAN_EDIT`, and `CAN_MANAGE`.

permissions/resource_permissions.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111

1212
"github.com/databrickslabs/terraform-provider-databricks/common"
1313
"github.com/databrickslabs/terraform-provider-databricks/jobs"
14+
"github.com/databrickslabs/terraform-provider-databricks/pipelines"
1415
"github.com/databrickslabs/terraform-provider-databricks/scim"
1516

1617
"github.com/databrickslabs/terraform-provider-databricks/workspace"
@@ -150,7 +151,7 @@ func (a PermissionsAPI) Update(objectID string, objectACL AccessControlChangeLis
150151
PermissionLevel: "CAN_MANAGE",
151152
})
152153
}
153-
if strings.HasPrefix(objectID, "/jobs") {
154+
if strings.HasPrefix(objectID, "/jobs") || strings.HasPrefix(objectID, "/pipelines") {
154155
owners := 0
155156
for _, acl := range objectACL.AccessControlList {
156157
if acl.PermissionLevel == "IS_OWNER" {
@@ -196,6 +197,15 @@ func (a PermissionsAPI) Delete(objectID string) error {
196197
UserName: job.CreatorUserName,
197198
PermissionLevel: "IS_OWNER",
198199
})
200+
} else if strings.HasPrefix(objectID, "/pipelines") {
201+
job, err := pipelines.NewPipelinesAPI(a.context, a.client).Read(strings.ReplaceAll(objectID, "/pipelines/", ""))
202+
if err != nil {
203+
return err
204+
}
205+
accl.AccessControlList = append(accl.AccessControlList, AccessControlChange{
206+
UserName: job.CreatorUserName,
207+
PermissionLevel: "IS_OWNER",
208+
})
199209
}
200210
return a.put(objectID, accl)
201211
}
@@ -241,6 +251,7 @@ func permissionsResourceIDFields() []permissionsIDFieldMapping {
241251
{"cluster_policy_id", "cluster-policy", "cluster-policies", []string{"CAN_USE"}, SIMPLE},
242252
{"instance_pool_id", "instance-pool", "instance-pools", []string{"CAN_ATTACH_TO", "CAN_MANAGE"}, SIMPLE},
243253
{"cluster_id", "cluster", "clusters", []string{"CAN_ATTACH_TO", "CAN_RESTART", "CAN_MANAGE"}, SIMPLE},
254+
{"pipeline_id", "pipelines", "pipelines", []string{"CAN_VIEW", "CAN_RUN", "CAN_MANAGE", "IS_OWNER"}, SIMPLE},
244255
{"job_id", "job", "jobs", []string{"CAN_VIEW", "CAN_MANAGE_RUN", "IS_OWNER", "CAN_MANAGE"}, SIMPLE},
245256
{"notebook_id", "notebook", "notebooks", []string{"CAN_READ", "CAN_RUN", "CAN_EDIT", "CAN_MANAGE"}, SIMPLE},
246257
{"notebook_path", "notebook", "notebooks", []string{"CAN_READ", "CAN_RUN", "CAN_EDIT", "CAN_MANAGE"}, PATH},

permissions/resource_permissions_test.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -986,6 +986,61 @@ func TestShouldKeepAdminsOnAnythingExceptPasswordsAndAssignsOwnerForJob(t *testi
986986
})
987987
}
988988

989+
func TestShouldKeepAdminsOnAnythingExceptPasswordsAndAssignsOwnerForPipeline(t *testing.T) {
990+
qa.HTTPFixturesApply(t, []qa.HTTPFixture{
991+
{
992+
Method: "GET",
993+
Resource: "/api/2.0/permissions/pipelines/123",
994+
Response: ObjectACL{
995+
ObjectID: "/pipelines/123",
996+
ObjectType: "pipeline",
997+
AccessControlList: []AccessControl{
998+
{
999+
GroupName: "admins",
1000+
AllPermissions: []Permission{
1001+
{
1002+
PermissionLevel: "CAN_DO_EVERYTHING",
1003+
Inherited: true,
1004+
},
1005+
{
1006+
PermissionLevel: "CAN_MANAGE",
1007+
Inherited: false,
1008+
},
1009+
},
1010+
},
1011+
},
1012+
},
1013+
},
1014+
{
1015+
Method: "GET",
1016+
Resource: "/api/2.0/pipelines/123",
1017+
Response: jobs.Job{
1018+
CreatorUserName: "[email protected]",
1019+
},
1020+
},
1021+
{
1022+
Method: "PUT",
1023+
Resource: "/api/2.0/permissions/pipelines/123",
1024+
ExpectedRequest: ObjectACL{
1025+
AccessControlList: []AccessControl{
1026+
{
1027+
GroupName: "admins",
1028+
PermissionLevel: "CAN_MANAGE",
1029+
},
1030+
{
1031+
UserName: "[email protected]",
1032+
PermissionLevel: "IS_OWNER",
1033+
},
1034+
},
1035+
},
1036+
},
1037+
}, func(ctx context.Context, client *common.DatabricksClient) {
1038+
p := NewPermissionsAPI(ctx, client)
1039+
err := p.Delete("/pipelines/123")
1040+
assert.NoError(t, err)
1041+
})
1042+
}
1043+
9891044
func TestCustomizeDiffNoHostYet(t *testing.T) {
9901045
assert.Nil(t, ResourcePermissions().CustomizeDiff(context.TODO(), nil, &common.DatabricksClient{}))
9911046
}

pipelines/resource_pipeline.go

Lines changed: 26 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -99,26 +99,27 @@ const (
9999
HealthStatusUnhealthy PipelineHealthStatus = "UNHEALTHY"
100100
)
101101

102-
type pipelineInfo struct {
103-
PipelineID string `json:"pipeline_id"`
104-
Spec *pipelineSpec `json:"spec"`
105-
State *PipelineState `json:"state"`
106-
Cause string `json:"cause"`
107-
ClusterID string `json:"cluster_id"`
108-
Name string `json:"name"`
109-
Health *PipelineHealthStatus `json:"health"`
102+
type PipelineInfo struct {
103+
PipelineID string `json:"pipeline_id"`
104+
Spec *pipelineSpec `json:"spec"`
105+
State *PipelineState `json:"state"`
106+
Cause string `json:"cause"`
107+
ClusterID string `json:"cluster_id"`
108+
Name string `json:"name"`
109+
Health *PipelineHealthStatus `json:"health"`
110+
CreatorUserName string `json:"creator_user_name"`
110111
}
111112

112-
type pipelinesAPI struct {
113+
type PipelinesAPI struct {
113114
client *common.DatabricksClient
114115
ctx context.Context
115116
}
116117

117-
func newPipelinesAPI(ctx context.Context, m interface{}) pipelinesAPI {
118-
return pipelinesAPI{m.(*common.DatabricksClient), ctx}
118+
func NewPipelinesAPI(ctx context.Context, m interface{}) PipelinesAPI {
119+
return PipelinesAPI{m.(*common.DatabricksClient), ctx}
119120
}
120121

121-
func (a pipelinesAPI) create(s pipelineSpec, timeout time.Duration) (string, error) {
122+
func (a PipelinesAPI) Create(s pipelineSpec, timeout time.Duration) (string, error) {
122123
var resp createPipelineResponse
123124
err := a.client.Post(a.ctx, "/pipelines", s, &resp)
124125
if err != nil {
@@ -128,7 +129,7 @@ func (a pipelinesAPI) create(s pipelineSpec, timeout time.Duration) (string, err
128129
err = a.waitForState(id, timeout, StateRunning)
129130
if err != nil {
130131
log.Printf("[INFO] Pipeline creation failed, attempting to clean up pipeline %s", id)
131-
err2 := a.delete(id, timeout)
132+
err2 := a.Delete(id, timeout)
132133
if err2 != nil {
133134
log.Printf("[WARN] Unable to delete pipeline %s; this resource needs to be manually cleaned up", id)
134135
return "", fmt.Errorf("multiple errors occurred when creating pipeline. Error while waiting for creation: \"%v\"; error while attempting to clean up failed pipeline: \"%v\"", err, err2)
@@ -139,27 +140,27 @@ func (a pipelinesAPI) create(s pipelineSpec, timeout time.Duration) (string, err
139140
return id, nil
140141
}
141142

142-
func (a pipelinesAPI) read(id string) (p pipelineInfo, err error) {
143+
func (a PipelinesAPI) Read(id string) (p PipelineInfo, err error) {
143144
err = a.client.Get(a.ctx, "/pipelines/"+id, nil, &p)
144145
return
145146
}
146147

147-
func (a pipelinesAPI) update(id string, s pipelineSpec, timeout time.Duration) error {
148+
func (a PipelinesAPI) Update(id string, s pipelineSpec, timeout time.Duration) error {
148149
err := a.client.Put(a.ctx, "/pipelines/"+id, s)
149150
if err != nil {
150151
return err
151152
}
152153
return a.waitForState(id, timeout, StateRunning)
153154
}
154155

155-
func (a pipelinesAPI) delete(id string, timeout time.Duration) error {
156+
func (a PipelinesAPI) Delete(id string, timeout time.Duration) error {
156157
err := a.client.Delete(a.ctx, "/pipelines/"+id, map[string]string{})
157158
if err != nil {
158159
return err
159160
}
160161
return resource.RetryContext(a.ctx, timeout,
161162
func() *resource.RetryError {
162-
i, err := a.read(id)
163+
i, err := a.Read(id)
163164
if err != nil {
164165
if common.IsMissing(err) {
165166
return nil
@@ -172,10 +173,10 @@ func (a pipelinesAPI) delete(id string, timeout time.Duration) error {
172173
})
173174
}
174175

175-
func (a pipelinesAPI) waitForState(id string, timeout time.Duration, desiredState PipelineState) error {
176+
func (a PipelinesAPI) waitForState(id string, timeout time.Duration, desiredState PipelineState) error {
176177
return resource.RetryContext(a.ctx, timeout,
177178
func() *resource.RetryError {
178-
i, err := a.read(id)
179+
i, err := a.Read(id)
179180
if err != nil {
180181
return resource.NonRetryableError(err)
181182
}
@@ -229,8 +230,8 @@ func ResourcePipeline() *schema.Resource {
229230
Create: func(ctx context.Context, d *schema.ResourceData, c *common.DatabricksClient) error {
230231
var s pipelineSpec
231232
common.DataToStructPointer(d, pipelineSchema, &s)
232-
api := newPipelinesAPI(ctx, c)
233-
id, err := api.create(s, d.Timeout(schema.TimeoutCreate))
233+
api := NewPipelinesAPI(ctx, c)
234+
id, err := api.Create(s, d.Timeout(schema.TimeoutCreate))
234235
if err != nil {
235236
return err
236237
}
@@ -239,7 +240,7 @@ func ResourcePipeline() *schema.Resource {
239240
return nil
240241
},
241242
Read: func(ctx context.Context, d *schema.ResourceData, c *common.DatabricksClient) error {
242-
i, err := newPipelinesAPI(ctx, c).read(d.Id())
243+
i, err := NewPipelinesAPI(ctx, c).Read(d.Id())
243244
if err != nil {
244245
return err
245246
}
@@ -251,11 +252,11 @@ func ResourcePipeline() *schema.Resource {
251252
Update: func(ctx context.Context, d *schema.ResourceData, c *common.DatabricksClient) error {
252253
var s pipelineSpec
253254
common.DataToStructPointer(d, pipelineSchema, &s)
254-
return newPipelinesAPI(ctx, c).update(d.Id(), s, d.Timeout(schema.TimeoutUpdate))
255+
return NewPipelinesAPI(ctx, c).Update(d.Id(), s, d.Timeout(schema.TimeoutUpdate))
255256
},
256257
Delete: func(ctx context.Context, d *schema.ResourceData, c *common.DatabricksClient) error {
257-
api := newPipelinesAPI(ctx, c)
258-
return api.delete(d.Id(), d.Timeout(schema.TimeoutDelete))
258+
api := NewPipelinesAPI(ctx, c)
259+
return api.Delete(d.Id(), d.Timeout(schema.TimeoutDelete))
259260
},
260261
Timeouts: &schema.ResourceTimeout{
261262
Default: schema.DefaultTimeout(DefaultTimeout),

pipelines/resource_pipeline_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ func TestResourcePipelineRead(t *testing.T) {
248248
{
249249
Method: "GET",
250250
Resource: "/api/2.0/pipelines/abcd",
251-
Response: pipelineInfo{
251+
Response: PipelineInfo{
252252
PipelineID: "abcd",
253253
Spec: &basicPipelineSpec,
254254
},
@@ -335,7 +335,7 @@ func TestResourcePipelineUpdate(t *testing.T) {
335335
{
336336
Method: "GET",
337337
Resource: "/api/2.0/pipelines/abcd",
338-
Response: pipelineInfo{
338+
Response: PipelineInfo{
339339
PipelineID: "abcd",
340340
Spec: &spec,
341341
State: &state,
@@ -344,7 +344,7 @@ func TestResourcePipelineUpdate(t *testing.T) {
344344
{
345345
Method: "GET",
346346
Resource: "/api/2.0/pipelines/abcd",
347-
Response: pipelineInfo{
347+
Response: PipelineInfo{
348348
PipelineID: "abcd",
349349
Spec: &spec,
350350
State: &state,
@@ -427,7 +427,7 @@ func TestResourcePipelineUpdate_FailsAfterUpdate(t *testing.T) {
427427
{
428428
Method: "GET",
429429
Resource: "/api/2.0/pipelines/abcd",
430-
Response: pipelineInfo{
430+
Response: PipelineInfo{
431431
PipelineID: "abcd",
432432
Spec: &spec,
433433
State: &state,
@@ -463,7 +463,7 @@ func TestResourcePipelineDelete(t *testing.T) {
463463
{
464464
Method: "GET",
465465
Resource: "/api/2.0/pipelines/abcd",
466-
Response: pipelineInfo{
466+
Response: PipelineInfo{
467467
PipelineID: "abcd",
468468
Spec: &basicPipelineSpec,
469469
State: &state,

0 commit comments

Comments
 (0)