Skip to content

Commit 03ca1db

Browse files
committed
fixes
1 parent 7497f46 commit 03ca1db

File tree

5 files changed

+14
-14
lines changed

5 files changed

+14
-14
lines changed

internal/documentmap/documentmap.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,9 +126,9 @@ func (m *Map) ImportFromCursor(ctx context.Context, cursor *mongo.Cursor, tracke
126126

127127
m.bytesSize = bytesReturned
128128

129-
m.logger.Debug().
129+
m.logger.Info().
130130
Int("documentedReturned", nDocumentsReturned).
131-
Str("totalSize", reportutils.BytesToUnit(bytesReturned, reportutils.FindBestUnit(bytesReturned))).
131+
Str("totalSize", reportutils.FmtBytes(bytesReturned)).
132132
Msgf("Finished reading %#q query.", "find")
133133

134134
return nil

internal/memorytracker/memorytracker.go

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,10 @@ type Tracker struct {
2323
}
2424

2525
func Start(ctx context.Context, logger *logger.Logger, max Unit) *Tracker {
26-
tracker := Tracker{max: max}
26+
tracker := Tracker{
27+
max: max,
28+
logger: logger,
29+
}
2730

2831
go tracker.track(ctx)
2932

@@ -38,7 +41,7 @@ func (mt *Tracker) AddWriter() Writer {
3841

3942
mt.selectCases = append(mt.selectCases, reflect.SelectCase{
4043
Dir: reflect.SelectRecv,
41-
Chan: reflect.ValueOf(newChan),
44+
Chan: reflect.ValueOf(reader(newChan)),
4245
})
4346

4447
return newChan
@@ -70,7 +73,7 @@ func (mt *Tracker) removeSelectCase(i int) {
7073

7174
func (mt *Tracker) track(ctx context.Context) {
7275
for {
73-
if mt.cur <= mt.max {
76+
if mt.cur > mt.max {
7477
mt.logger.Panic().
7578
Int64("usage", mt.cur).
7679
Int64("softLimit", mt.max).
@@ -98,11 +101,7 @@ func (mt *Tracker) track(ctx context.Context) {
98101
Msg("Reclaimed tracked memory.")
99102
}
100103

101-
if alive {
102-
if got == 0 {
103-
mt.logger.Panic().Msg("Got zero track value but channel is not closed.")
104-
}
105-
} else {
104+
if !alive {
106105
if got != 0 {
107106
mt.logger.Panic().
108107
Int64("receivedValue", got).

internal/reportutils/reportutils.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,8 @@ func DurationToHMS(duration time.Duration) string {
7070
// FmtBytes is a convenience that combines BytesToUnit with FindBestUnit.
7171
// Use it to format a single count of bytes.
7272
func FmtBytes[T num16Plus](count T) string {
73-
return BytesToUnit(count, FindBestUnit(count))
73+
unit := FindBestUnit(count)
74+
return BytesToUnit(count, unit) + " " + string(unit)
7475
}
7576

7677
// BytesToUnit returns a stringified number that represents `count`

internal/verifier/check.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ func (verifier *Verifier) waitForChangeStream() error {
6363

6464
func (verifier *Verifier) CheckWorker(ctx context.Context) error {
6565
verifier.logger.Debug().Msgf("Starting %d verification workers", verifier.numWorkers)
66-
memTracker := memorytracker.Start(ctx, verifier.logger, 40_000_000) // TODO
66+
memTracker := memorytracker.Start(ctx, verifier.logger, 40_000_000_000) // TODO
6767
ctx, cancel := context.WithCancel(ctx)
6868
wg := sync.WaitGroup{}
6969
for i := 0; i < verifier.numWorkers; i++ {

internal/verifier/migration_verifier.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -504,8 +504,8 @@ func (verifier *Verifier) fetchDocuments(task *VerificationTask, trackerWriter m
504504

505505
if err == nil && int64(srcClientMap.TotalDocsBytes()) > warnThreshold {
506506
verifier.logger.Warn().
507-
Str("totalSize", reportutils.BytesToUnit(srcClientMap.TotalDocsBytes(), reportutils.FindBestUnit(srcClientMap.TotalDocsBytes()))).
508-
Str("intendedPartitionSize", reportutils.BytesToUnit(verifier.partitionSizeInBytes, reportutils.FindBestUnit(verifier.partitionSizeInBytes))).
507+
Str("totalSize", reportutils.FmtBytes(srcClientMap.TotalDocsBytes())).
508+
Str("intendedPartitionSize", reportutils.FmtBytes(verifier.partitionSizeInBytes)).
509509
Str("filter", fmt.Sprintf("%v", findCmd)).
510510
Msg("Partition greatly exceeds desired size. This may cause excess memory usage.")
511511
}

0 commit comments

Comments
 (0)