Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 100 additions & 0 deletions bundle/config/mutator/capture_schema_dependency.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package mutator

import (
"context"
"fmt"

"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config/resources"
"github.com/databricks/cli/libs/diag"
)

type captureSchemaDependency struct{}

// If a user defines a UC schema in the bundle, they can refer to it in DLT pipelines
// or UC Volumes using the `${resources.schemas.<schema_key>.name}` syntax. Using this
// syntax allows TF to capture the deploy time dependency this DLT pipeline or UC Volume
// has on the schema and deploy changes to the schema before deploying the pipeline or volume.
//
// This mutator translates any implicit schema references in DLT pipelines or UC Volumes
// to the explicit syntax.
func CaptureSchemaDependency() bundle.Mutator {
return &captureSchemaDependency{}
}

func (m *captureSchemaDependency) Name() string {
return "CaptureSchemaDependency"
}

func schemaNameRef(key string) string {
return fmt.Sprintf("${resources.schemas.%s.name}", key)
}

func findSchema(b *bundle.Bundle, catalogName, schemaName string) (string, *resources.Schema) {
if catalogName == "" || schemaName == "" {
return "", nil
}

for k, s := range b.Config.Resources.Schemas {
if s != nil && s.CreateSchema != nil && s.CatalogName == catalogName && s.Name == schemaName {
return k, s
}
}
return "", nil
}

func resolveVolume(v *resources.Volume, b *bundle.Bundle) {
if v == nil || v.CreateVolumeRequestContent == nil {
return
}
schemaK, schema := findSchema(b, v.CatalogName, v.SchemaName)
if schema == nil {
return
}

v.SchemaName = schemaNameRef(schemaK)
}

func resolvePipelineSchema(p *resources.Pipeline, b *bundle.Bundle) {
if p == nil || p.PipelineSpec == nil {
return
}
if p.Schema == "" {
return
}
schemaK, schema := findSchema(b, p.Catalog, p.Schema)
if schema == nil {
return
}

p.Schema = schemaNameRef(schemaK)
}

func resolvePipelineTarget(p *resources.Pipeline, b *bundle.Bundle) {
if p == nil || p.PipelineSpec == nil {
return
}
if p.Target == "" {
return
}
schemaK, schema := findSchema(b, p.Catalog, p.Target)
if schema == nil {
return
}
p.Target = schemaNameRef(schemaK)
}

func (m *captureSchemaDependency) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
for _, p := range b.Config.Resources.Pipelines {
// "schema" and "target" have the same semantics in the DLT API but are mutually
// exclusive i.e. only one can be set at a time. If schema is set, the pipeline
// is in direct publishing mode and can write tables to multiple schemas
// (vs target which is limited to a single schema).
resolvePipelineTarget(p, b)
resolvePipelineSchema(p, b)
}
for _, v := range b.Config.Resources.Volumes {
resolveVolume(v, b)
}
return nil
}
277 changes: 277 additions & 0 deletions bundle/config/mutator/capture_schema_dependency_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,277 @@
package mutator

import (
"context"
"testing"

"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/config/resources"
"github.com/databricks/databricks-sdk-go/service/catalog"
"github.com/databricks/databricks-sdk-go/service/pipelines"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestCaptureSchemaDependencyForVolume(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Schemas: map[string]*resources.Schema{
"schema1": {
CreateSchema: &catalog.CreateSchema{
CatalogName: "catalog1",
Name: "foobar",
},
},
"schema2": {
CreateSchema: &catalog.CreateSchema{
CatalogName: "catalog2",
Name: "foobar",
},
},
"schema3": {
CreateSchema: &catalog.CreateSchema{
CatalogName: "catalog1",
Name: "barfoo",
},
},
"nilschema": nil,
"emptyschema": {},
},
Volumes: map[string]*resources.Volume{
"volume1": {
CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{
CatalogName: "catalog1",
SchemaName: "foobar",
},
},
"volume2": {
CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{
CatalogName: "catalog2",
SchemaName: "foobar",
},
},
"volume3": {
CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{
CatalogName: "catalog1",
SchemaName: "barfoo",
},
},
"volume4": {
CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{
CatalogName: "catalogX",
SchemaName: "foobar",
},
},
"volume5": {
CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{
CatalogName: "catalog1",
SchemaName: "schemaX",
},
},
"nilVolume": nil,
"emptyVolume": {},
},
},
},
}

d := bundle.Apply(context.Background(), b, CaptureSchemaDependency())
require.Nil(t, d)

assert.Equal(t, "${resources.schemas.schema1.name}", b.Config.Resources.Volumes["volume1"].CreateVolumeRequestContent.SchemaName)
assert.Equal(t, "${resources.schemas.schema2.name}", b.Config.Resources.Volumes["volume2"].CreateVolumeRequestContent.SchemaName)
assert.Equal(t, "${resources.schemas.schema3.name}", b.Config.Resources.Volumes["volume3"].CreateVolumeRequestContent.SchemaName)
assert.Equal(t, "foobar", b.Config.Resources.Volumes["volume4"].CreateVolumeRequestContent.SchemaName)
assert.Equal(t, "schemaX", b.Config.Resources.Volumes["volume5"].CreateVolumeRequestContent.SchemaName)

assert.Nil(t, b.Config.Resources.Volumes["nilVolume"])
assert.Nil(t, b.Config.Resources.Volumes["emptyVolume"].CreateVolumeRequestContent)
}

func TestCaptureSchemaDependencyForPipelinesWithTarget(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Schemas: map[string]*resources.Schema{
"schema1": {
CreateSchema: &catalog.CreateSchema{
CatalogName: "catalog1",
Name: "foobar",
},
},
"schema2": {
CreateSchema: &catalog.CreateSchema{
CatalogName: "catalog2",
Name: "foobar",
},
},
"schema3": {
CreateSchema: &catalog.CreateSchema{
CatalogName: "catalog1",
Name: "barfoo",
},
},
"nilschema": nil,
"emptyschema": {},
},
Pipelines: map[string]*resources.Pipeline{
"pipeline1": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "catalog1",
Schema: "foobar",
},
},
"pipeline2": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "catalog2",
Schema: "foobar",
},
},
"pipeline3": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "catalog1",
Schema: "barfoo",
},
},
"pipeline4": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "catalogX",
Schema: "foobar",
},
},
"pipeline5": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "catalog1",
Schema: "schemaX",
},
},
"pipeline6": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "",
Schema: "foobar",
},
},
"pipeline7": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "",
Schema: "",
Name: "whatever",
},
},
"nilPipeline": nil,
"emptyPipeline": {},
},
},
},
}

d := bundle.Apply(context.Background(), b, CaptureSchemaDependency())
require.Nil(t, d)

assert.Equal(t, "${resources.schemas.schema1.name}", b.Config.Resources.Pipelines["pipeline1"].Schema)
assert.Equal(t, "${resources.schemas.schema2.name}", b.Config.Resources.Pipelines["pipeline2"].Schema)
assert.Equal(t, "${resources.schemas.schema3.name}", b.Config.Resources.Pipelines["pipeline3"].Schema)
assert.Equal(t, "foobar", b.Config.Resources.Pipelines["pipeline4"].Schema)
assert.Equal(t, "schemaX", b.Config.Resources.Pipelines["pipeline5"].Schema)
assert.Equal(t, "foobar", b.Config.Resources.Pipelines["pipeline6"].Schema)
assert.Equal(t, "", b.Config.Resources.Pipelines["pipeline7"].Schema)

assert.Nil(t, b.Config.Resources.Pipelines["nilPipeline"])
assert.Nil(t, b.Config.Resources.Pipelines["emptyPipeline"].PipelineSpec)

for _, k := range []string{"pipeline1", "pipeline2", "pipeline3", "pipeline4", "pipeline5", "pipeline6", "pipeline7"} {
assert.Empty(t, b.Config.Resources.Pipelines[k].Target)
}
}

func TestCaptureSchemaDependencyForPipelinesWithSchema(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Schemas: map[string]*resources.Schema{
"schema1": {
CreateSchema: &catalog.CreateSchema{
CatalogName: "catalog1",
Name: "foobar",
},
},
"schema2": {
CreateSchema: &catalog.CreateSchema{
CatalogName: "catalog2",
Name: "foobar",
},
},
"schema3": {
CreateSchema: &catalog.CreateSchema{
CatalogName: "catalog1",
Name: "barfoo",
},
},
"nilschema": nil,
"emptyschema": {},
},
Pipelines: map[string]*resources.Pipeline{
"pipeline1": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "catalog1",
Target: "foobar",
},
},
"pipeline2": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "catalog2",
Target: "foobar",
},
},
"pipeline3": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "catalog1",
Target: "barfoo",
},
},
"pipeline4": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "catalogX",
Target: "foobar",
},
},
"pipeline5": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "catalog1",
Target: "schemaX",
},
},
"pipeline6": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "",
Target: "foobar",
},
},
"pipeline7": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "",
Target: "",
Name: "whatever",
},
},
},
},
},
}

d := bundle.Apply(context.Background(), b, CaptureSchemaDependency())
require.Nil(t, d)
assert.Equal(t, "${resources.schemas.schema1.name}", b.Config.Resources.Pipelines["pipeline1"].Target)
assert.Equal(t, "${resources.schemas.schema2.name}", b.Config.Resources.Pipelines["pipeline2"].Target)
assert.Equal(t, "${resources.schemas.schema3.name}", b.Config.Resources.Pipelines["pipeline3"].Target)
assert.Equal(t, "foobar", b.Config.Resources.Pipelines["pipeline4"].Target)
assert.Equal(t, "schemaX", b.Config.Resources.Pipelines["pipeline5"].Target)
assert.Equal(t, "foobar", b.Config.Resources.Pipelines["pipeline6"].Target)
assert.Equal(t, "", b.Config.Resources.Pipelines["pipeline7"].Target)

for _, k := range []string{"pipeline1", "pipeline2", "pipeline3", "pipeline4", "pipeline5", "pipeline6", "pipeline7"} {
assert.Empty(t, b.Config.Resources.Pipelines[k].Schema)
}
}
2 changes: 2 additions & 0 deletions bundle/phases/initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ func Initialize() bundle.Mutator {
mutator.MergePipelineClusters(),
mutator.MergeApps(),

mutator.CaptureSchemaDependency(),

// Provide permission config errors & warnings after initializing all variables
permissions.PermissionDiagnostics(),
mutator.SetRunAs(),
Expand Down
Loading