Skip to content

Commit c2cbbe1

Browse files
Use filer when running generate on DBR
1 parent 2141440 commit c2cbbe1

File tree

10 files changed

+231
-72
lines changed

10 files changed

+231
-72
lines changed

bundle/generate/downloader.go

Lines changed: 24 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,17 @@
11
package generate
22

33
import (
4+
"bytes"
45
"context"
56
"fmt"
67
"io"
78
"net/http"
8-
"os"
99
"path"
1010
"path/filepath"
1111
"strings"
1212

1313
"github.com/databricks/cli/libs/cmdio"
14+
"github.com/databricks/cli/libs/filer"
1415
"github.com/databricks/cli/libs/notebook"
1516
"github.com/databricks/databricks-sdk-go"
1617
"github.com/databricks/databricks-sdk-go/service/jobs"
@@ -27,11 +28,12 @@ type exportFile struct {
2728
}
2829

2930
type Downloader struct {
30-
files map[string]exportFile
31-
w *databricks.WorkspaceClient
32-
sourceDir string
33-
configDir string
34-
basePath string
31+
files map[string]exportFile
32+
w *databricks.WorkspaceClient
33+
sourceDir string
34+
configDir string
35+
basePath string
36+
outputFiler filer.Filer
3537
}
3638

3739
func (n *Downloader) MarkTaskForDownload(ctx context.Context, task *jobs.Task) error {
@@ -194,7 +196,7 @@ func (n *Downloader) relativePath(fullPath string) string {
194196
func (n *Downloader) FlushToDisk(ctx context.Context, force bool) error {
195197
// First check that all files can be written
196198
for targetPath := range n.files {
197-
info, err := os.Stat(targetPath)
199+
info, err := n.outputFiler.Stat(ctx, targetPath)
198200
if err == nil {
199201
if info.IsDir() {
200202
return fmt.Errorf("%s is a directory", targetPath)
@@ -207,42 +209,42 @@ func (n *Downloader) FlushToDisk(ctx context.Context, force bool) error {
207209

208210
errs, errCtx := errgroup.WithContext(ctx)
209211
for targetPath, exportFile := range n.files {
210-
// Create parent directories if they don't exist
211-
dir := filepath.Dir(targetPath)
212-
err := os.MkdirAll(dir, 0o755)
213-
if err != nil {
214-
return err
215-
}
216212
errs.Go(func() error {
217213
reader, err := n.w.Workspace.Download(errCtx, exportFile.path, workspace.DownloadFormat(exportFile.format))
218214
if err != nil {
219215
return err
220216
}
217+
defer reader.Close()
221218

222-
file, err := os.Create(targetPath)
219+
// Read into buffer so we can write via the filer
220+
content, err := io.ReadAll(reader)
223221
if err != nil {
224222
return err
225223
}
226-
defer file.Close()
227224

228-
_, err = io.Copy(file, reader)
225+
mode := []filer.WriteMode{filer.CreateParentDirectories}
226+
if force {
227+
mode = append(mode, filer.OverwriteIfExists)
228+
}
229+
err = n.outputFiler.Write(errCtx, targetPath, bytes.NewReader(content), mode...)
229230
if err != nil {
230231
return err
231232
}
232233

233234
cmdio.LogString(errCtx, "File successfully saved to "+targetPath)
234-
return reader.Close()
235+
return nil
235236
})
236237
}
237238

238239
return errs.Wait()
239240
}
240241

241-
func NewDownloader(w *databricks.WorkspaceClient, sourceDir, configDir string) *Downloader {
242+
func NewDownloader(w *databricks.WorkspaceClient, sourceDir, configDir string, outputFiler filer.Filer) *Downloader {
242243
return &Downloader{
243-
files: make(map[string]exportFile),
244-
w: w,
245-
sourceDir: sourceDir,
246-
configDir: configDir,
244+
files: make(map[string]exportFile),
245+
w: w,
246+
sourceDir: sourceDir,
247+
configDir: configDir,
248+
outputFiler: outputFiler,
247249
}
248250
}

bundle/generate/downloader_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import (
55
"path/filepath"
66
"testing"
77

8+
"github.com/databricks/cli/libs/fakefs"
9+
"github.com/databricks/cli/libs/filer"
810
"github.com/databricks/databricks-sdk-go/experimental/mocks"
911
"github.com/databricks/databricks-sdk-go/service/workspace"
1012
"github.com/stretchr/testify/assert"
@@ -18,7 +20,8 @@ func TestDownloader_MarkFileReturnsRelativePath(t *testing.T) {
1820
dir := "base/dir/doesnt/matter"
1921
sourceDir := filepath.Join(dir, "source")
2022
configDir := filepath.Join(dir, "config")
21-
downloader := NewDownloader(m.WorkspaceClient, sourceDir, configDir)
23+
fakeFiler := filer.NewFakeFiler(map[string]fakefs.FileInfo{})
24+
downloader := NewDownloader(m.WorkspaceClient, sourceDir, configDir, fakeFiler)
2225

2326
var err error
2427

cmd/bundle/generate/app.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/databricks/cli/libs/cmdio"
1111
"github.com/databricks/cli/libs/dyn"
1212
"github.com/databricks/cli/libs/dyn/yamlsaver"
13+
"github.com/databricks/cli/libs/filer"
1314
"github.com/databricks/cli/libs/logdiag"
1415
"github.com/databricks/cli/libs/textutil"
1516
"github.com/databricks/databricks-sdk-go/service/apps"
@@ -77,7 +78,12 @@ per target environment.`,
7778
return err
7879
}
7980

80-
downloader := generate.NewDownloader(w, sourceDir, configDir)
81+
outputFiler, err := filer.NewOutputFiler(ctx, w, b.BundleRootPath)
82+
if err != nil {
83+
return err
84+
}
85+
86+
downloader := generate.NewDownloader(w, sourceDir, configDir, outputFiler)
8187

8288
sourceCodePath := app.DefaultSourceCodePath
8389
// If the source code path is not set, we don't need to download anything.
@@ -121,7 +127,7 @@ per target environment.`,
121127
filename := filepath.Join(configDir, appKey+".app.yml")
122128

123129
saver := yamlsaver.NewSaver()
124-
err = saver.SaveAsYAML(result, filename, force)
130+
err = saver.SaveAsYAMLToFiler(ctx, outputFiler, result, filename, force)
125131
if err != nil {
126132
return err
127133
}

cmd/bundle/generate/dashboard.go

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"errors"
88
"fmt"
99
"io"
10-
"os"
1110
"path"
1211
"path/filepath"
1312
"strings"
@@ -25,6 +24,7 @@ import (
2524
"github.com/databricks/cli/libs/diag"
2625
"github.com/databricks/cli/libs/dyn"
2726
"github.com/databricks/cli/libs/dyn/yamlsaver"
27+
"github.com/databricks/cli/libs/filer"
2828
"github.com/databricks/cli/libs/logdiag"
2929
"github.com/databricks/cli/libs/textutil"
3030
"github.com/databricks/databricks-sdk-go"
@@ -66,6 +66,9 @@ type dashboard struct {
6666
// Output and error streams.
6767
out io.Writer
6868
err io.Writer
69+
70+
// Output filer for writing files.
71+
outputFiler filer.Filer
6972
}
7073

7174
func (d *dashboard) resolveID(ctx context.Context, b *bundle.Bundle) string {
@@ -165,19 +168,14 @@ func remarshalJSON(data []byte) ([]byte, error) {
165168
return buf.Bytes(), nil
166169
}
167170

168-
func (d *dashboard) saveSerializedDashboard(_ context.Context, b *bundle.Bundle, dashboard *dashboards.Dashboard, filename string) error {
171+
func (d *dashboard) saveSerializedDashboard(ctx context.Context, b *bundle.Bundle, dashboard *dashboards.Dashboard, filename string) error {
169172
// Unmarshal and remarshal the serialized dashboard to ensure it is formatted correctly.
170173
// The result will have alphabetically sorted keys and be indented.
171174
data, err := remarshalJSON([]byte(dashboard.SerializedDashboard))
172175
if err != nil {
173176
return err
174177
}
175178

176-
// Make sure the output directory exists.
177-
if err := os.MkdirAll(filepath.Dir(filename), 0o755); err != nil {
178-
return err
179-
}
180-
181179
// Clean the filename to ensure it is a valid path (and can be used on this OS).
182180
filename = filepath.Clean(filename)
183181

@@ -188,7 +186,7 @@ func (d *dashboard) saveSerializedDashboard(_ context.Context, b *bundle.Bundle,
188186
}
189187

190188
// Verify that the file does not already exist.
191-
info, err := os.Stat(filename)
189+
info, err := d.outputFiler.Stat(ctx, filename)
192190
if err == nil {
193191
if info.IsDir() {
194192
return fmt.Errorf("%s is a directory", rel)
@@ -199,7 +197,12 @@ func (d *dashboard) saveSerializedDashboard(_ context.Context, b *bundle.Bundle,
199197
}
200198

201199
fmt.Fprintf(d.out, "Writing dashboard to %q\n", rel)
202-
return os.WriteFile(filename, data, 0o644)
200+
201+
mode := []filer.WriteMode{filer.CreateParentDirectories}
202+
if d.force {
203+
mode = append(mode, filer.OverwriteIfExists)
204+
}
205+
return d.outputFiler.Write(ctx, filename, bytes.NewReader(data), mode...)
203206
}
204207

205208
func (d *dashboard) saveConfiguration(ctx context.Context, b *bundle.Bundle, dashboard *dashboards.Dashboard, key string) error {
@@ -225,11 +228,6 @@ func (d *dashboard) saveConfiguration(ctx context.Context, b *bundle.Bundle, das
225228
}),
226229
}
227230

228-
// Make sure the output directory exists.
229-
if err := os.MkdirAll(d.resourceDir, 0o755); err != nil {
230-
return err
231-
}
232-
233231
// Save the configuration to the resource directory.
234232
resourcePath := filepath.Join(d.resourceDir, key+".dashboard.yml")
235233
saver := yamlsaver.NewSaverWithStyle(map[string]yaml.Style{
@@ -243,7 +241,7 @@ func (d *dashboard) saveConfiguration(ctx context.Context, b *bundle.Bundle, das
243241
}
244242

245243
fmt.Fprintf(d.out, "Writing configuration to %q\n", rel)
246-
err = saver.SaveAsYAML(result, resourcePath, d.force)
244+
err = saver.SaveAsYAMLToFiler(ctx, d.outputFiler, result, resourcePath, d.force)
247245
if err != nil {
248246
return err
249247
}
@@ -370,6 +368,14 @@ func (d *dashboard) initialize(ctx context.Context, b *bundle.Bundle) {
370368
}
371369

372370
d.relativeDashboardDir = filepath.ToSlash(rel)
371+
372+
// Construct output filer for writing files.
373+
outputFiler, err := filer.NewOutputFiler(ctx, b.WorkspaceClient(), b.BundleRootPath)
374+
if err != nil {
375+
logdiag.LogError(ctx, err)
376+
return
377+
}
378+
d.outputFiler = outputFiler
373379
}
374380

375381
func (d *dashboard) runForResource(ctx context.Context, b *bundle.Bundle) {

cmd/bundle/generate/job.go

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
package generate
22

33
import (
4+
"bytes"
5+
"context"
46
"errors"
57
"fmt"
8+
"io"
69
"io/fs"
7-
"os"
810
"path/filepath"
911
"strconv"
1012

@@ -14,13 +16,39 @@ import (
1416
"github.com/databricks/cli/libs/cmdio"
1517
"github.com/databricks/cli/libs/dyn"
1618
"github.com/databricks/cli/libs/dyn/yamlsaver"
19+
"github.com/databricks/cli/libs/filer"
1720
"github.com/databricks/cli/libs/logdiag"
1821
"github.com/databricks/cli/libs/textutil"
1922
"github.com/databricks/databricks-sdk-go/service/jobs"
2023
"github.com/spf13/cobra"
2124
"gopkg.in/yaml.v3"
2225
)
2326

27+
// filerRename renames a file using filer operations (read, write, delete).
28+
// This is needed because the filer interface doesn't have a native rename method.
29+
func filerRename(ctx context.Context, f filer.Filer, oldPath, newPath string) error {
30+
// Read the old file
31+
r, err := f.Read(ctx, oldPath)
32+
if err != nil {
33+
return err
34+
}
35+
defer r.Close()
36+
37+
content, err := io.ReadAll(r)
38+
if err != nil {
39+
return err
40+
}
41+
42+
// Write to new path
43+
err = f.Write(ctx, newPath, bytes.NewReader(content), filer.CreateParentDirectories, filer.OverwriteIfExists)
44+
if err != nil {
45+
return err
46+
}
47+
48+
// Delete the old file
49+
return f.Delete(ctx, oldPath)
50+
}
51+
2452
func NewGenerateJobCommand() *cobra.Command {
2553
var configDir string
2654
var sourceDir string
@@ -80,7 +108,12 @@ After generation, you can deploy this job to other targets using:
80108
return err
81109
}
82110

83-
downloader := generate.NewDownloader(w, sourceDir, configDir)
111+
outputFiler, err := filer.NewOutputFiler(ctx, w, b.BundleRootPath)
112+
if err != nil {
113+
return err
114+
}
115+
116+
downloader := generate.NewDownloader(w, sourceDir, configDir, outputFiler)
84117

85118
// Don't download files if the job is using Git source
86119
// When Git source is used, the job will be using the files from the Git repository
@@ -129,7 +162,7 @@ After generation, you can deploy this job to other targets using:
129162
// User might continuously run generate command to update their bundle jobs with any changes made in Databricks UI.
130163
// Due to changing in the generated file names, we need to first rename existing resource file to the new name.
131164
// Otherwise users can end up with duplicated resources.
132-
err = os.Rename(oldFilename, filename)
165+
err = filerRename(ctx, outputFiler, oldFilename, filename)
133166
if err != nil && !errors.Is(err, fs.ErrNotExist) {
134167
return fmt.Errorf("failed to rename file %s. DABs uses the resource type as a sub-extension for generated content, please rename it to %s, err: %w", oldFilename, filename, err)
135168
}
@@ -140,7 +173,7 @@ After generation, you can deploy this job to other targets using:
140173
"custom_tags": yaml.DoubleQuotedStyle,
141174
"tags": yaml.DoubleQuotedStyle,
142175
})
143-
err = saver.SaveAsYAML(result, filename, force)
176+
err = saver.SaveAsYAMLToFiler(ctx, outputFiler, result, filename, force)
144177
if err != nil {
145178
return err
146179
}

cmd/bundle/generate/pipeline.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"errors"
55
"fmt"
66
"io/fs"
7-
"os"
87
"path/filepath"
98

109
"github.com/databricks/cli/bundle/generate"
@@ -13,6 +12,7 @@ import (
1312
"github.com/databricks/cli/libs/cmdio"
1413
"github.com/databricks/cli/libs/dyn"
1514
"github.com/databricks/cli/libs/dyn/yamlsaver"
15+
"github.com/databricks/cli/libs/filer"
1616
"github.com/databricks/cli/libs/logdiag"
1717
"github.com/databricks/cli/libs/textutil"
1818
"github.com/databricks/databricks-sdk-go/service/pipelines"
@@ -79,7 +79,12 @@ like catalogs, schemas, and compute configurations per target.`,
7979
return err
8080
}
8181

82-
downloader := generate.NewDownloader(w, sourceDir, configDir)
82+
outputFiler, err := filer.NewOutputFiler(ctx, w, b.BundleRootPath)
83+
if err != nil {
84+
return err
85+
}
86+
87+
downloader := generate.NewDownloader(w, sourceDir, configDir, outputFiler)
8388
for _, lib := range pipeline.Spec.Libraries {
8489
err := downloader.MarkPipelineLibraryForDownload(ctx, &lib)
8590
if err != nil {
@@ -131,7 +136,7 @@ like catalogs, schemas, and compute configurations per target.`,
131136
// User might continuously run generate command to update their bundle jobs with any changes made in Databricks UI.
132137
// Due to changing in the generated file names, we need to first rename existing resource file to the new name.
133138
// Otherwise users can end up with duplicated resources.
134-
err = os.Rename(oldFilename, filename)
139+
err = filerRename(ctx, outputFiler, oldFilename, filename)
135140
if err != nil && !errors.Is(err, fs.ErrNotExist) {
136141
return fmt.Errorf("failed to rename file %s. DABs uses the resource type as a sub-extension for generated content, please rename it to %s, err: %w", oldFilename, filename, err)
137142
}
@@ -144,7 +149,7 @@ like catalogs, schemas, and compute configurations per target.`,
144149
"configuration": yaml.DoubleQuotedStyle,
145150
},
146151
)
147-
err = saver.SaveAsYAML(result, filename, force)
152+
err = saver.SaveAsYAMLToFiler(ctx, outputFiler, result, filename, force)
148153
if err != nil {
149154
return err
150155
}

0 commit comments

Comments
 (0)