Skip to content

Commit 82f2321

Browse files
committed
sql/import: pass deep copy of BulkOpSummary to progress
Fixes: #153480 Prior to this change, we passed the `BulkOpSummary` map directly from `accumulatedBulkSummary` to `prog.Summary` in #152745. This caused a `concurrent map iteration and map write` panic because the map was being updated via `metaFn` while simultaneously being marshaled into a protobuf during `jobs.Update` calls from `FractionProgressed`. This change creates a deep copy of the map when assigning from `accumulatedBulkSummary` to `prog.Summary`, eliminating the concurrency issue by ensuring each goroutine operates on separate map instances. Release Notes: None
1 parent 755c186 commit 82f2321

File tree

3 files changed

+17
-1
lines changed

3 files changed

+17
-1
lines changed

pkg/kv/kvpb/api.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2133,6 +2133,21 @@ func (b *BulkOpSummary) Add(other BulkOpSummary) {
21332133
}
21342134
}
21352135

2136+
// DeepCopy returns a deep copy of the original BulkOpSummary.
2137+
func (b *BulkOpSummary) DeepCopy() BulkOpSummary {
2138+
cpy := BulkOpSummary{
2139+
DataSize: b.DataSize,
2140+
SSTDataSize: b.SSTDataSize,
2141+
}
2142+
if b.EntryCounts != nil {
2143+
cpy.EntryCounts = make(map[uint64]int64, len(b.EntryCounts))
2144+
for k, v := range b.EntryCounts {
2145+
cpy.EntryCounts[k] = v
2146+
}
2147+
}
2148+
return cpy
2149+
}
2150+
21362151
// MustSetValue is like SetValue, except it resets the enum and panics if the
21372152
// provided value is not a valid variant type.
21382153
func (e *RangeFeedEvent) MustSetValue(value interface{}) {

pkg/kv/kvpb/api.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1935,6 +1935,7 @@ message BulkOpSummary {
19351935
// generation logic is also available in the BulkOpSummaryID helper. It does
19361936
// not take MVCC range tombstones into account.
19371937
map<uint64, int64> entry_counts = 5;
1938+
// Please update BulkOpSummary.DeepCopy() if new fields are added.
19381939
}
19391940

19401941
// ExportResponse is the response to an Export() operation.

pkg/sql/importer/import_processor_planning.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ func distImport(
176176
}
177177

178178
accumulatedBulkSummary.Lock()
179-
prog.Summary = accumulatedBulkSummary.BulkOpSummary
179+
prog.Summary = accumulatedBulkSummary.BulkOpSummary.DeepCopy()
180180
accumulatedBulkSummary.Unlock()
181181
return overall / float32(len(from))
182182
},

0 commit comments

Comments
 (0)