Skip to content

Commit 33d98fb

Browse files
authored
Exporter: parallel listing of workspace objects (#2691)
* draft implementation of parallel listing of workspace objects * Add tests * Add N retries in case if listing fails * Address review comments
1 parent 323ae44 commit 33d98fb

File tree

4 files changed

+171
-2
lines changed

4 files changed

+171
-2
lines changed

exporter/importables.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1102,6 +1102,7 @@ var resourcesMap map[string]importable = map[string]importable{
11021102
return nil
11031103
},
11041104
List: func(ic *importContext) error {
1105+
// TODO: Should we use parallel listing instead?
11051106
repoList, err := repos.NewReposAPI(ic.Context, ic.Client).ListAll()
11061107
if err != nil {
11071108
return err

exporter/util.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,16 +160,21 @@ func (ic *importContext) getAllDirectories() []workspace.ObjectStatus {
160160

161161
func (ic *importContext) getAllWorkspaceObjects() []workspace.ObjectStatus {
162162
if len(ic.allWorkspaceObjects) == 0 {
163+
t1 := time.Now()
164+
log.Printf("[DEBUG] %v. Starting to list all workspace objects", t1.Local().Format(time.RFC3339))
163165
notebooksAPI := workspace.NewNotebooksAPI(ic.Context, ic.Client)
164-
ic.allWorkspaceObjects, _ = notebooksAPI.List("/", true, true)
166+
ic.allWorkspaceObjects, _ = notebooksAPI.ListParallel("/", true)
167+
t2 := time.Now()
168+
log.Printf("[DEBUG] %v. Finished listing of all workspace objects. %d objects in total. %v seconds",
169+
t2.Local().Format(time.RFC3339), len(ic.allWorkspaceObjects), t2.Sub(t1).Seconds())
165170
}
166171
return ic.allWorkspaceObjects
167172
}
168173

169174
func (ic *importContext) emitGroups(u scim.User, principal string) {
170175
for _, g := range u.Groups {
171176
if g.Type != "direct" {
172-
log.Printf("Skipping non-direct group %s/%s for %s", g.Value, g.Display, principal)
177+
log.Printf("[DEBUG] Skipping non-direct group %s/%s for %s", g.Value, g.Display, principal)
173178
continue
174179
}
175180
ic.Emit(&resource{

workspace/resource_notebook.go

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,13 @@ package workspace
33
import (
44
"context"
55
"encoding/base64"
6+
"log"
7+
"os"
68
"path/filepath"
9+
"strconv"
710
"strings"
811
"sync"
12+
"time"
913

1014
"github.com/databricks/terraform-provider-databricks/common"
1115

@@ -139,6 +143,99 @@ func (a NotebooksAPI) Mkdirs(path string) error {
139143
}, nil)
140144
}
141145

146+
type syncAnswer struct {
147+
MU sync.Mutex
148+
data []ObjectStatus
149+
}
150+
151+
func (a *syncAnswer) append(objs []ObjectStatus) {
152+
a.MU.Lock()
153+
a.data = append(a.data, objs...)
154+
a.MU.Unlock()
155+
}
156+
157+
type directoryInfo struct {
158+
Path string
159+
Attempts int
160+
}
161+
162+
// constants related to the parallel listing
163+
const (
164+
directoryListingMaxAttempts = 3
165+
envVarListParallelism = "EXPORTER_WS_LIST_PARALLELISM"
166+
envVarDirectoryChannelSize = "EXPORTER_CHANNEL_SIZE"
167+
defaultWorkersPoolSize = 5
168+
defaultDirectoryChannelSize = 100000
169+
)
170+
171+
func getFormattedNowTime() string {
172+
return time.Now().Local().Format(time.RFC3339Nano)
173+
}
174+
175+
func (a NotebooksAPI) recursiveAddPathsParallel(directory directoryInfo, dirChannel chan directoryInfo,
176+
answer *syncAnswer, wg *sync.WaitGroup) {
177+
defer wg.Done()
178+
notebookInfoList, err := a.list(directory.Path)
179+
if err != nil {
180+
log.Printf("[WARN] error listing '%s': %v", directory.Path, err)
181+
if directory.Attempts < directoryListingMaxAttempts {
182+
wg.Add(1)
183+
dirChannel <- directoryInfo{Path: directory.Path, Attempts: directory.Attempts + 1}
184+
}
185+
}
186+
answer.append(notebookInfoList)
187+
for _, v := range notebookInfoList {
188+
if v.ObjectType == Directory {
189+
wg.Add(1)
190+
log.Printf("[DEBUG] %s: putting directory '%s' into channel. Channel size: %d",
191+
getFormattedNowTime(), v.Path, len(dirChannel))
192+
dirChannel <- directoryInfo{Path: v.Path}
193+
time.Sleep(15 * time.Millisecond)
194+
}
195+
}
196+
}
197+
198+
func getEnvAsInt(envName string, defaultValue int) int {
199+
if val, exists := os.LookupEnv(envName); exists {
200+
parsedVal, err := strconv.Atoi(val)
201+
if err == nil {
202+
return parsedVal
203+
}
204+
}
205+
return defaultValue
206+
}
207+
208+
func (a NotebooksAPI) ListParallel(path string, recursive bool) ([]ObjectStatus, error) {
209+
var answer syncAnswer
210+
wg := &sync.WaitGroup{}
211+
212+
numWorkers := getEnvAsInt(envVarListParallelism, defaultWorkersPoolSize)
213+
channelSize := getEnvAsInt(envVarDirectoryChannelSize, defaultDirectoryChannelSize)
214+
dirChannel := make(chan directoryInfo, channelSize)
215+
for i := 0; i < numWorkers; i++ {
216+
t := i
217+
go func() {
218+
log.Printf("[DEBUG] %s: starting go routine %d", getFormattedNowTime(), t)
219+
for directory := range dirChannel {
220+
log.Printf("[DEBUG] %s: processing directory %s", getFormattedNowTime(), directory.Path)
221+
a.recursiveAddPathsParallel(directory, dirChannel, &answer, wg)
222+
}
223+
}()
224+
225+
}
226+
log.Printf("[DEBUG] %s: pushing initial path to channel", getFormattedNowTime())
227+
wg.Add(1)
228+
a.recursiveAddPathsParallel(directoryInfo{Path: path}, dirChannel, &answer, wg)
229+
log.Printf("[DEBUG] %s: starting to wait", getFormattedNowTime())
230+
wg.Wait()
231+
log.Printf("[DEBUG] %s: closing the directory channel", getFormattedNowTime())
232+
close(dirChannel)
233+
234+
answer.MU.Lock()
235+
defer answer.MU.Unlock()
236+
return answer.data, nil
237+
}
238+
142239
// List will list all objects in a path on the workspace
143240
// and with the recursive flag it will recursively list
144241
// all the objects

workspace/resource_notebook_test.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
package workspace
22

33
import (
4+
"context"
45
"net/http"
6+
"os"
57
"testing"
68

79
"github.com/databricks/databricks-sdk-go/apierr"
810
"github.com/databricks/terraform-provider-databricks/qa"
911

1012
"github.com/stretchr/testify/assert"
13+
"github.com/stretchr/testify/require"
1114
)
1215

1316
func TestResourceNotebookRead(t *testing.T) {
@@ -376,3 +379,66 @@ func TestNotebookLanguageSuppressSourceDiff(t *testing.T) {
376379
suppress := r.Schema["language"].DiffSuppressFunc
377380
assert.True(t, suppress("language", Python, Python, d))
378381
}
382+
383+
func TestParallelListing(t *testing.T) {
384+
client, server, err := qa.HttpFixtureClient(t, []qa.HTTPFixture{
385+
{
386+
Method: "GET",
387+
Resource: "/api/2.0/workspace/list?path=%2F",
388+
Response: ObjectList{
389+
Objects: []ObjectStatus{
390+
{
391+
ObjectID: 1,
392+
ObjectType: Directory,
393+
Path: "/a",
394+
},
395+
{
396+
ObjectID: 2,
397+
ObjectType: Directory,
398+
Path: "/b",
399+
},
400+
},
401+
},
402+
},
403+
{
404+
Method: "GET",
405+
Resource: "/api/2.0/workspace/list?path=%2Fa",
406+
Response: ObjectList{
407+
Objects: []ObjectStatus{
408+
{
409+
ObjectID: 3,
410+
ObjectType: Notebook,
411+
Language: Python,
412+
Path: "/a/e",
413+
},
414+
},
415+
},
416+
},
417+
{
418+
Method: "GET",
419+
Resource: "/api/2.0/workspace/list?path=%2Fb",
420+
Response: ObjectList{
421+
Objects: []ObjectStatus{
422+
{
423+
ObjectID: 4,
424+
ObjectType: Notebook,
425+
Language: SQL,
426+
Path: "/b/c",
427+
},
428+
},
429+
},
430+
},
431+
})
432+
defer server.Close()
433+
require.NoError(t, err)
434+
435+
os.Setenv("EXPORTER_WS_LIST_PARALLLELISM", "2")
436+
os.Setenv("EXPORTER_CHANNEL_SIZE", "100")
437+
ctx := context.Background()
438+
api := NewNotebooksAPI(ctx, client)
439+
objects, err := api.ListParallel("/", true)
440+
441+
require.NoError(t, err)
442+
require.Equal(t, 4, len(objects))
443+
444+
}

0 commit comments

Comments
 (0)