Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 120 additions & 2 deletions cmd/bundle/generate/generate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package generate
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"io/fs"
"os"
"path/filepath"
"testing"
Expand Down Expand Up @@ -90,7 +92,7 @@ func TestGeneratePipelineCommand(t *testing.T) {
err := cmd.RunE(cmd, []string{})
require.NoError(t, err)

data, err := os.ReadFile(filepath.Join(configDir, "test_pipeline.yml"))
data, err := os.ReadFile(filepath.Join(configDir, "test_pipeline.pipeline.yml"))
require.NoError(t, err)
require.Equal(t, fmt.Sprintf(`resources:
pipelines:
Expand Down Expand Up @@ -186,7 +188,123 @@ func TestGenerateJobCommand(t *testing.T) {
err := cmd.RunE(cmd, []string{})
require.NoError(t, err)

data, err := os.ReadFile(filepath.Join(configDir, "test_job.yml"))
data, err := os.ReadFile(filepath.Join(configDir, "test_job.job.yml"))
require.NoError(t, err)

require.Equal(t, fmt.Sprintf(`resources:
jobs:
test_job:
name: test-job
job_clusters:
- new_cluster:
custom_tags:
"Tag1": "24X7-1234"
- new_cluster:
spark_conf:
"spark.databricks.delta.preview.enabled": "true"
tasks:
- task_key: notebook_task
notebook_task:
notebook_path: %s
parameters:
- name: empty
default: ""
`, filepath.Join("..", "src", "notebook.py")), string(data))

data, err = os.ReadFile(filepath.Join(srcDir, "notebook.py"))
require.NoError(t, err)
require.Equal(t, "# Databricks notebook source\nNotebook content", string(data))
}

func touchEmptyFile(t *testing.T, path string) {
err := os.MkdirAll(filepath.Dir(path), 0700)
require.NoError(t, err)
f, err := os.Create(path)
require.NoError(t, err)
f.Close()
}

func TestGenerateJobCommandOldFileRename(t *testing.T) {
cmd := NewGenerateJobCommand()

root := t.TempDir()
b := &bundle.Bundle{
BundleRootPath: root,
}

m := mocks.NewMockWorkspaceClient(t)
b.SetWorkpaceClient(m.WorkspaceClient)

jobsApi := m.GetMockJobsAPI()
jobsApi.EXPECT().Get(mock.Anything, jobs.GetJobRequest{JobId: 1234}).Return(&jobs.Job{
Settings: &jobs.JobSettings{
Name: "test-job",
JobClusters: []jobs.JobCluster{
{NewCluster: compute.ClusterSpec{
CustomTags: map[string]string{
"Tag1": "24X7-1234",
},
}},
{NewCluster: compute.ClusterSpec{
SparkConf: map[string]string{
"spark.databricks.delta.preview.enabled": "true",
},
}},
},
Tasks: []jobs.Task{
{
TaskKey: "notebook_task",
NotebookTask: &jobs.NotebookTask{
NotebookPath: "/test/notebook",
},
},
},
Parameters: []jobs.JobParameterDefinition{
{
Name: "empty",
Default: "",
},
},
},
}, nil)

workspaceApi := m.GetMockWorkspaceAPI()
workspaceApi.EXPECT().GetStatusByPath(mock.Anything, "/test/notebook").Return(&workspace.ObjectInfo{
ObjectType: workspace.ObjectTypeNotebook,
Language: workspace.LanguagePython,
Path: "/test/notebook",
}, nil)

notebookContent := io.NopCloser(bytes.NewBufferString("# Databricks notebook source\nNotebook content"))
workspaceApi.EXPECT().Download(mock.Anything, "/test/notebook", mock.Anything).Return(notebookContent, nil)

cmd.SetContext(bundle.Context(context.Background(), b))
cmd.Flag("existing-job-id").Value.Set("1234")

configDir := filepath.Join(root, "resources")
cmd.Flag("config-dir").Value.Set(configDir)

srcDir := filepath.Join(root, "src")
cmd.Flag("source-dir").Value.Set(srcDir)

var key string
cmd.Flags().StringVar(&key, "key", "test_job", "")

// Create an old generated file first
oldFilename := filepath.Join(configDir, "test_job.yml")
touchEmptyFile(t, oldFilename)

// Having an existing files require --force flag to regenerate them
cmd.Flag("force").Value.Set("true")

err := cmd.RunE(cmd, []string{})
require.NoError(t, err)

// Make sure file do not exists after the run
_, err = os.Stat(oldFilename)
require.True(t, errors.Is(err, fs.ErrNotExist))

data, err := os.ReadFile(filepath.Join(configDir, "test_job.job.yml"))
require.NoError(t, err)

require.Equal(t, fmt.Sprintf(`resources:
Expand Down
14 changes: 13 additions & 1 deletion cmd/bundle/generate/job.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package generate

import (
"errors"
"fmt"
"io/fs"
"os"
"path/filepath"

Expand Down Expand Up @@ -83,7 +85,17 @@ func NewGenerateJobCommand() *cobra.Command {
return err
}

filename := filepath.Join(configDir, fmt.Sprintf("%s.yml", jobKey))
oldFilename := filepath.Join(configDir, fmt.Sprintf("%s.yml", jobKey))
filename := filepath.Join(configDir, fmt.Sprintf("%s.job.yml", jobKey))

// User might continuously run generate command to update their bundle jobs with any changes made in Databricks UI.
// Due to changing in the generated file names, we need to first rename existing resource file to the new name.
// Otherwise users can end up with duplicated resources.
err = os.Rename(oldFilename, filename)
if err != nil && !errors.Is(err, fs.ErrNotExist) {
return fmt.Errorf("failed to rename file %s. DABs uses the resource type as a sub-extension for generated content, please rename it to %s, err: %w", oldFilename, filename, err)
}

saver := yamlsaver.NewSaverWithStyle(map[string]yaml.Style{
// Including all JobSettings and nested fields which are map[string]string type
"spark_conf": yaml.DoubleQuotedStyle,
Expand Down
14 changes: 13 additions & 1 deletion cmd/bundle/generate/pipeline.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package generate

import (
"errors"
"fmt"
"io/fs"
"os"
"path/filepath"

Expand Down Expand Up @@ -83,7 +85,17 @@ func NewGeneratePipelineCommand() *cobra.Command {
return err
}

filename := filepath.Join(configDir, fmt.Sprintf("%s.yml", pipelineKey))
oldFilename := filepath.Join(configDir, fmt.Sprintf("%s.yml", pipelineKey))
filename := filepath.Join(configDir, fmt.Sprintf("%s.pipeline.yml", pipelineKey))

// User might continuously run generate command to update their bundle jobs with any changes made in Databricks UI.
// Due to changing in the generated file names, we need to first rename existing resource file to the new name.
// Otherwise users can end up with duplicated resources.
err = os.Rename(oldFilename, filename)
if err != nil && !errors.Is(err, fs.ErrNotExist) {
return fmt.Errorf("failed to rename file %s. DABs uses the resource type as a sub-extension for generated content, please rename it to %s, err: %w", oldFilename, filename, err)
}

saver := yamlsaver.NewSaverWithStyle(
// Including all PipelineSpec and nested fields which are map[string]string type
map[string]yaml.Style{
Expand Down
2 changes: 1 addition & 1 deletion internal/bundle/bind_resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func TestAccGenerateAndBind(t *testing.T) {
_, err = os.Stat(filepath.Join(bundleRoot, "src", "test.py"))
require.NoError(t, err)

matches, err := filepath.Glob(filepath.Join(bundleRoot, "resources", "test_job_key.yml"))
matches, err := filepath.Glob(filepath.Join(bundleRoot, "resources", "test_job_key.job.yml"))
require.NoError(t, err)

require.Len(t, matches, 1)
Expand Down
Loading