Skip to content

Commit 2c13e8d

Browse files
authored
[Exporter] Improving reliability of Emit function (#4163)
## Changes <!-- Summary of your changes that are easy to understand --> I found in the large-scale testing that sometimes we don't handle emitting of the same resource reliably, and this may lead to generation of duplicate resources (very small amount, but still) - found this in a very specific case when notebooks were listed without directories. This PR fixes this problem: - by tracking if resource is already in importing queue - detecting duplicates during code generation It also may improve performance a bit (2-3%). ## Tests <!-- How is this tested? Please see the checklist below and also describe any other relevant tests --> - [x] `make test` run locally - [ ] relevant change in `docs/` folder - [ ] covered with integration tests in `internal/acceptance` - [ ] relevant acceptance tests are passing - [ ] using Go SDK
1 parent 948bf08 commit 2c13e8d

File tree

2 files changed

+43
-23
lines changed

2 files changed

+43
-23
lines changed

exporter/codegen.go

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -904,22 +904,27 @@ func (ic *importContext) handleResourceWrite(generatedFile string, ch dataWriteC
904904
return
905905
}
906906

907-
//
908907
newResources := make(map[string]struct{}, 100)
909908
log.Printf("[DEBUG] started processing new writes for %s", generatedFile)
910909
for f := range ch {
911910
if f != nil {
912-
log.Printf("[DEBUG] started writing resource body for %s", f.BlockName)
913-
_, err = tf.WriteString(f.ResourceBody)
914-
if err == nil {
915-
newResources[f.BlockName] = struct{}{}
916-
if f.ImportCommand != "" {
917-
ic.waitGroup.Add(1)
918-
importChan <- f.ImportCommand
911+
// check if we have the same blockname already written. To avoid duplicates
912+
_, exists := newResources[f.BlockName]
913+
if !exists {
914+
log.Printf("[DEBUG] started writing resource body for %s", f.BlockName)
915+
_, err = tf.WriteString(f.ResourceBody)
916+
if err == nil {
917+
newResources[f.BlockName] = struct{}{}
918+
if f.ImportCommand != "" {
919+
ic.waitGroup.Add(1)
920+
importChan <- f.ImportCommand
921+
}
922+
log.Printf("[DEBUG] finished writing resource body for %s", f.BlockName)
923+
} else {
924+
log.Printf("[ERROR] Error when writing to %s: %v", generatedFile, err)
919925
}
920-
log.Printf("[DEBUG] finished writing resource body for %s", f.BlockName)
921926
} else {
922-
log.Printf("[ERROR] Error when writing to %s: %v", generatedFile, err)
927+
log.Printf("[WARN] Found duplicate resource: '%s'", f.BlockName)
923928
}
924929
} else {
925930
log.Print("[WARN] got nil as resourceWriteData!")

exporter/context.go

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ var goroutinesNumber = map[string]int{
204204
"databricks_sql_dashboard": 3,
205205
"databricks_sql_widget": 4,
206206
"databricks_sql_visualization": 4,
207-
"databricks_query": 4,
207+
"databricks_query": 6,
208208
"databricks_alert": 2,
209209
"databricks_permissions": 11,
210210
}
@@ -615,17 +615,20 @@ func (ic *importContext) HasInState(r *resource, onlyAdded bool) bool {
615615
return ic.State.Has(r)
616616
}
617617

618-
func (ic *importContext) setImportingState(s string, state bool) {
619-
ic.importingMutex.Lock()
620-
defer ic.importingMutex.Unlock()
621-
ic.importing[s] = state
622-
}
623-
624618
func (ic *importContext) Add(r *resource) {
625619
if ic.HasInState(r, true) { // resource must exist and already marked as added
626620
return
627621
}
628-
ic.setImportingState(r.String(), true) // mark resource as added
622+
rString := r.String()
623+
ic.importingMutex.Lock()
624+
_, ok := ic.importing[rString]
625+
if ok {
626+
ic.importingMutex.Unlock()
627+
log.Printf("[DEBUG] %s already being added", rString)
628+
return
629+
}
630+
ic.importing[rString] = true // mark resource as added
631+
ic.importingMutex.Unlock()
629632
state := r.Data.State()
630633
if state == nil {
631634
log.Printf("[ERROR] state is nil for %s", r)
@@ -648,7 +651,6 @@ func (ic *importContext) Add(r *resource) {
648651
Instances: []instanceApproximation{inst},
649652
Resource: r,
650653
})
651-
// in single-threaded scenario scope is toposorted
652654
ic.Scope.Append(r)
653655
}
654656

@@ -727,14 +729,25 @@ func (ic *importContext) Emit(r *resource) {
727729
log.Printf("[DEBUG] %s already imported", r)
728730
return
729731
}
732+
rString := r.String()
730733
if ic.testEmits != nil {
731734
log.Printf("[INFO] %s is emitted in test mode", r)
732735
ic.testEmitsMutex.Lock()
733-
ic.testEmits[r.String()] = true
736+
ic.testEmits[rString] = true
734737
ic.testEmitsMutex.Unlock()
735738
return
736739
}
737-
ic.setImportingState(r.String(), false) // we're starting to add a new resource
740+
// we need to check that we're not importing the same resource twice - this may happen under high concurrency
741+
// for specific resources, for example, directories when they aren't part of the listing
742+
ic.importingMutex.Lock()
743+
res, ok := ic.importing[rString]
744+
if ok {
745+
ic.importingMutex.Unlock()
746+
log.Printf("[DEBUG] %s already being imported: %v", rString, res)
747+
return
748+
}
749+
ic.importing[rString] = false // // we're starting to add a new resource
750+
ic.importingMutex.Unlock()
738751
_, ok = ic.Resources[r.Resource]
739752
if !ok {
740753
log.Printf("[ERROR] %s is not available in provider", r)
@@ -745,8 +758,10 @@ func (ic *importContext) Emit(r *resource) {
745758
log.Printf("[DEBUG] %s (%s service) is not part of the account level export", r.Resource, ir.Service)
746759
return
747760
}
748-
// TODO: add similar condition for checking workspace-level objects only. After new ACLs import is merged
749-
761+
if !ic.accountLevel && !ir.WorkspaceLevel {
762+
log.Printf("[DEBUG] %s (%s service) is not part of the workspace level export", r.Resource, ir.Service)
763+
return
764+
}
750765
// from here, it should be done by the goroutine... send resource into the channel
751766
ch, exists := ic.channels[r.Resource]
752767
if exists {

0 commit comments

Comments
 (0)