Skip to content

Commit 9cfddbb

Browse files
committed
seems maybe ok
1 parent 03ca1db commit 9cfddbb

File tree

2 files changed

+22
-19
lines changed

2 files changed

+22
-19
lines changed

internal/documentmap/documentmap.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -115,11 +115,13 @@ func (m *Map) ImportFromCursor(ctx context.Context, cursor *mongo.Cursor, tracke
115115
return err
116116
}
117117

118-
nDocumentsReturned++
119-
bytesReturned += (types.ByteCount)(len(cursor.Current))
118+
docSize := (types.ByteCount)(len(cursor.Current))
120119

121120
// This will block if needs be to prevent OOMs.
122-
trackerWriter <- memorytracker.Unit(bytesReturned)
121+
trackerWriter <- memorytracker.Unit(docSize)
122+
123+
bytesReturned += docSize
124+
nDocumentsReturned++
123125

124126
m.copyAndAddDocument(cursor.Current)
125127
}

internal/memorytracker/memorytracker.go

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,16 @@ type Writer = chan<- Unit
1616

1717
type Tracker struct {
1818
logger *logger.Logger
19-
max Unit
20-
cur Unit
19+
softLimit Unit
20+
curUsage Unit
2121
selectCases []reflect.SelectCase
2222
mux sync.RWMutex
2323
}
2424

2525
func Start(ctx context.Context, logger *logger.Logger, max Unit) *Tracker {
2626
tracker := Tracker{
27-
max: max,
28-
logger: logger,
27+
softLimit: max,
28+
logger: logger,
2929
}
3030

3131
go tracker.track(ctx)
@@ -68,15 +68,15 @@ func (mt *Tracker) removeSelectCase(i int) {
6868
mt.mux.Lock()
6969
defer mt.mux.Unlock()
7070

71-
mt.selectCases = slices.Delete(mt.selectCases, i, 1+i)
71+
mt.selectCases = slices.Delete(mt.selectCases, 1+i, 2+i)
7272
}
7373

7474
func (mt *Tracker) track(ctx context.Context) {
7575
for {
76-
if mt.cur > mt.max {
76+
if mt.curUsage > mt.softLimit {
7777
mt.logger.Panic().
78-
Int64("usage", mt.cur).
79-
Int64("softLimit", mt.max).
78+
Int64("usage", mt.curUsage).
79+
Int64("softLimit", mt.softLimit).
8080
Msg("track() loop should never be in memory excess!")
8181
}
8282

@@ -93,11 +93,12 @@ func (mt *Tracker) track(ctx context.Context) {
9393
}
9494

9595
got := (gotVal.Interface()).(Unit)
96-
mt.cur += got
96+
mt.curUsage += got
9797

9898
if got < 0 {
99-
mt.logger.Debug().
99+
mt.logger.Info().
100100
Str("reclaimed", reportutils.FmtBytes(-got)).
101+
Str("tracked", reportutils.FmtBytes(mt.curUsage)).
101102
Msg("Reclaimed tracked memory.")
102103
}
103104

@@ -117,20 +118,20 @@ func (mt *Tracker) track(ctx context.Context) {
117118

118119
didSingleThread := false
119120

120-
for mt.cur > mt.max {
121+
for mt.curUsage > mt.softLimit {
121122
reader := (selectCases[chosen].Chan.Interface()).(reader)
122123

123124
if !didSingleThread {
124125
mt.logger.Warn().
125-
Str("usage", reportutils.FmtBytes(mt.cur)).
126-
Str("softLimit", reportutils.FmtBytes(mt.max)).
126+
Str("usage", reportutils.FmtBytes(mt.curUsage)).
127+
Str("softLimit", reportutils.FmtBytes(mt.softLimit)).
127128
Msg("Tracked memory usage now exceeds soft limit. Suspending concurrent reads until tracked usage falls.")
128129

129130
didSingleThread = true
130131
}
131132

132133
got, alive := <-reader
133-
mt.cur += got
134+
mt.curUsage += got
134135

135136
if !alive {
136137
mt.removeSelectCase(chosen)
@@ -139,8 +140,8 @@ func (mt *Tracker) track(ctx context.Context) {
139140

140141
if didSingleThread {
141142
mt.logger.Info().
142-
Str("usage", reportutils.FmtBytes(mt.cur)).
143-
Str("softLimit", reportutils.FmtBytes(mt.max)).
143+
Str("usage", reportutils.FmtBytes(mt.curUsage)).
144+
Str("softLimit", reportutils.FmtBytes(mt.softLimit)).
144145
Msg("Tracked memory usage is now below soft limit. Resuming concurrent reads.")
145146
}
146147
}

0 commit comments

Comments
 (0)