Skip to content

Commit 3f8144f

Browse files
authored
Exporter: Add retries for Search, ReadContext and Import operations when importing the resource (#3202)
* Exporter: Add retries for Search/ReadContext and Import operations when importing the resource Under a very high load, Databricks backend may not answer on time, or return specific errors, so it makes sense to retry operation few times. This PR uses "naive" implementation, I need to play a bit more with `retries` package before adopting it * Review comments * Code refactoring, reworking retries for directory listings as well * Adjust logging levels
1 parent 43861e1 commit 3f8144f

File tree

4 files changed

+103
-7
lines changed

4 files changed

+103
-7
lines changed

exporter/context.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -758,7 +758,7 @@ func (ic *importContext) generateAndWriteResources(sh *os.File) {
758758
for i, r := range resources {
759759
ic.waitGroup.Add(1)
760760
resourcesChan <- r
761-
if i%50 == 0 {
761+
if i%500 == 0 {
762762
log.Printf("[INFO] Submitted %d of %d resources", i+1, scopeSize)
763763
}
764764
}

exporter/context_test.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,50 @@ func TestEmitNoSearchNoId(t *testing.T) {
201201
close(ch)
202202
}
203203

204+
func TestEmitNoSearchNoIdWithRetry(t *testing.T) {
205+
ch := make(resourceChannel, 10)
206+
state := newStateApproximation([]string{"a"})
207+
i := 0
208+
ic := &importContext{
209+
importing: map[string]bool{},
210+
Resources: map[string]*schema.Resource{
211+
"a": {},
212+
},
213+
Importables: map[string]importable{
214+
"a": {
215+
Service: "e",
216+
Search: func(ic *importContext, r *resource) error {
217+
if i > 0 {
218+
return nil
219+
}
220+
i = i + 1
221+
return fmt.Errorf("context deadline exceeded (Client.Timeout exceeded while awaiting headers)")
222+
},
223+
},
224+
},
225+
waitGroup: &sync.WaitGroup{},
226+
channels: map[string]resourceChannel{
227+
"a": ch,
228+
},
229+
ignoredResources: map[string]struct{}{},
230+
State: state,
231+
}
232+
ic.enableServices("e")
233+
go func() {
234+
for r := range ch {
235+
r.ImportResource(ic)
236+
}
237+
}()
238+
ic.Emit(&resource{
239+
Resource: "a",
240+
Attribute: "b",
241+
Value: "d",
242+
Name: "c",
243+
})
244+
ic.waitGroup.Wait()
245+
close(ch)
246+
}
247+
204248
func TestEmitNoSearchSucceedsImportFails(t *testing.T) {
205249
ch := make(resourceChannel, 10)
206250
state := newStateApproximation([]string{"a"})

exporter/model.go

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/databricks/terraform-provider-databricks/common"
1313

1414
"github.com/hashicorp/hcl/v2/hclwrite"
15+
"github.com/hashicorp/terraform-plugin-sdk/v2/diag"
1516
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
1617
"github.com/hashicorp/terraform-plugin-sdk/v2/terraform"
1718
)
@@ -265,8 +266,12 @@ func (r *resource) ImportResource(ic *importContext) {
265266
log.Printf("[ERROR] Searching %s is not available", r)
266267
return
267268
}
268-
if err := ir.Search(ic, r); err != nil {
269-
log.Printf("[ERROR] Cannot search for a resource %s: %v", err, r)
269+
err := runWithRetries(func() error {
270+
return ir.Search(ic, r)
271+
},
272+
fmt.Sprintf("searching of %v", r))
273+
if err != nil {
274+
log.Printf("[ERROR] Error searching %s#%s: %v", r.Resource, r.ID, err)
270275
return
271276
}
272277
if r.ID == "" {
@@ -288,7 +293,11 @@ func (r *resource) ImportResource(ic *importContext) {
288293
if apiVersion != "" {
289294
ctx = context.WithValue(ctx, common.Api, apiVersion)
290295
}
291-
if dia := pr.ReadContext(ctx, r.Data, ic.Client); dia != nil {
296+
dia := runWithRetries(func() diag.Diagnostics {
297+
return pr.ReadContext(ctx, r.Data, ic.Client)
298+
},
299+
fmt.Sprintf("reading %s#%s", r.Resource, r.ID))
300+
if dia != nil {
292301
log.Printf("[ERROR] Error reading %s#%s: %v", r.Resource, r.ID, dia)
293302
return
294303
}
@@ -298,7 +307,11 @@ func (r *resource) ImportResource(ic *importContext) {
298307
}
299308
r.Name = ic.ResourceName(r)
300309
if ir.Import != nil {
301-
if err := ir.Import(ic, r); err != nil {
310+
err := runWithRetries(func() error {
311+
return ir.Import(ic, r)
312+
},
313+
fmt.Sprintf("importing of %s#%s", r.Resource, r.ID))
314+
if err != nil {
302315
log.Printf("[ERROR] Failed custom import of %s: %s", r, err)
303316
return
304317
}

exporter/util.go

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1088,7 +1088,6 @@ type directoryInfo struct {
10881088

10891089
// constants related to the parallel listing
10901090
const (
1091-
directoryListingMaxAttempts = 3
10921091
envVarListParallelism = "EXPORTER_WS_LIST_PARALLELISM"
10931092
envVarDirectoryChannelSize = "EXPORTER_DIRECTORIES_CHANNEL_SIZE"
10941093
defaultWorkersPoolSize = 10
@@ -1101,8 +1100,11 @@ func recursiveAddPathsParallel(a workspace.NotebooksAPI, directory directoryInfo
11011100
notebookInfoList, err := a.ListInternalImpl(directory.Path)
11021101
if err != nil {
11031102
log.Printf("[WARN] error listing '%s': %v", directory.Path, err)
1104-
if directory.Attempts < directoryListingMaxAttempts {
1103+
if isRetryableError(err.Error(), directory.Attempts) {
11051104
wg.Add(1)
1105+
log.Printf("[INFO] attempt %d of retrying listing of '%s' after error: %v",
1106+
directory.Attempts+1, directory.Path, err)
1107+
time.Sleep(time.Duration(retryDelaySeconds) * time.Second)
11061108
dirChannel <- directoryInfo{Path: directory.Path, Attempts: directory.Attempts + 1}
11071109
}
11081110
}
@@ -1166,3 +1168,40 @@ func ListParallel(a workspace.NotebooksAPI, path string, shouldIncludeDir func(w
11661168
defer answer.MU.Unlock()
11671169
return answer.data, nil
11681170
}
1171+
1172+
var (
1173+
maxRetries = 5
1174+
retryDelaySeconds = 2
1175+
retriableErrors = []string{"context deadline exceeded", "Error handling request", "Timed out after "}
1176+
)
1177+
1178+
func isRetryableError(err string, i int) bool {
1179+
if i < (maxRetries - 1) {
1180+
for _, msg := range retriableErrors {
1181+
if strings.Contains(err, msg) {
1182+
return true
1183+
}
1184+
}
1185+
}
1186+
return false
1187+
}
1188+
1189+
func runWithRetries[ERR any](runFunc func() ERR, msg string) ERR {
1190+
var err ERR
1191+
delay := 1
1192+
for i := 0; i < maxRetries; i++ {
1193+
err = runFunc()
1194+
valOf := reflect.ValueOf(&err).Elem()
1195+
if valOf.IsNil() || valOf.IsZero() {
1196+
break
1197+
}
1198+
if !isRetryableError(fmt.Sprintf("%v", err), i) {
1199+
log.Printf("[ERROR] Error %s after %d retries: %v", msg, i, err)
1200+
return err
1201+
}
1202+
delay = delay * retryDelaySeconds
1203+
log.Printf("[INFO] next retry (%d) for %s after %d seconds", (i + 1), msg, delay)
1204+
time.Sleep(time.Duration(delay) * time.Second)
1205+
}
1206+
return err
1207+
}

0 commit comments

Comments
 (0)