Skip to content

Commit 0615759

Browse files
GODRIVER-3173 Clean up code
1 parent 9cc397a commit 0615759

File tree

3 files changed

+15
-17
lines changed

3 files changed

+15
-17
lines changed

internal/integration/mtest/mongotest.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -564,6 +564,11 @@ func (t *T) TrackFailPoint(fpName string) {
564564

565565
// ClearFailPoints disables all previously set failpoints for this test.
566566
func (t *T) ClearFailPoints() {
567+
// Run some arbitrary command to ensure that any connection that would
568+
// otherwise blocking during a pending read is closed. This could happen if
569+
// the mode times > 1 and the blocking time is > default pending read timeout.
570+
_ = t.Client.Ping(context.Background(), nil)
571+
567572
db := t.Client.Database("admin")
568573
for _, fp := range t.failPointNames {
569574
cmd := failpoint.FailPoint{

x/mongo/driver/topology/connection.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -491,13 +491,11 @@ func (c *connection) read(ctx context.Context) (bytesRead []byte, errMsg string,
491491
if l := int32(n); l == 0 && isCSOTTimeout(err) && driverutil.HasMaxTimeMS(ctx) {
492492
requestID, _ := driverutil.GetRequestID(ctx)
493493

494-
//c.pendingReadMu.Lock()
495494
c.pendingReadState = &pendingReadState{
496495
remainingBytes: l,
497496
requestID: requestID,
498497
start: time.Now(),
499498
}
500-
//c.pendingReadMu.Unlock()
501499
}
502500
return nil, "incomplete read of message header", err
503501
}
@@ -515,13 +513,11 @@ func (c *connection) read(ctx context.Context) (bytesRead []byte, errMsg string,
515513
if remainingBytes > 0 && isCSOTTimeout(err) && driverutil.HasMaxTimeMS(ctx) {
516514
requestID, _ := driverutil.GetRequestID(ctx)
517515

518-
//c.pendingReadMu.Lock()
519516
c.pendingReadState = &pendingReadState{
520517
remainingBytes: remainingBytes,
521518
requestID: requestID,
522519
start: time.Now(),
523520
}
524-
//c.pendingReadMu.Unlock()
525521
}
526522
return dst, "incomplete read of full message", err
527523
}

x/mongo/driver/topology/pool.go

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -907,7 +907,9 @@ func newPendingReadContext(parent context.Context, remainingTime time.Duration)
907907
// buffered reader. If the connection is closed, it will return false.
908908
func peekConnectionAlive(conn *connection) (int, error) {
909909
// Set a very short deadline to avoid blocking.
910-
conn.nc.SetReadDeadline(time.Now().Add(1 * time.Nanosecond))
910+
if err := conn.nc.SetReadDeadline(time.Now().Add(1 * time.Nanosecond)); err != nil {
911+
return 0, err
912+
}
911913

912914
// Wrap the connection in a buffered reader to use peek.
913915
reader := bufio.NewReader(conn.nc)
@@ -927,6 +929,8 @@ func attemptPendingRead(ctx context.Context, conn *connection, remainingTime tim
927929
calculatedDeadline := time.Now().Add(remainingTime)
928930

929931
if contextDeadlineUsed {
932+
// Use the minimum of the user-provided deadline and the calculated
933+
// deadline.
930934
if calculatedDeadline.Before(dl) {
931935
dl = calculatedDeadline
932936
}
@@ -941,7 +945,6 @@ func attemptPendingRead(ctx context.Context, conn *connection, remainingTime tim
941945

942946
size := pendingreadState.remainingBytes
943947

944-
//st := time.Now()
945948
if size == 0 { // Question: Would this alawys equal to zero?
946949
var sizeBuf [4]byte
947950
if bytesRead, err := io.ReadFull(conn.nc, sizeBuf[:]); err != nil {
@@ -989,7 +992,6 @@ func attemptPendingRead(ctx context.Context, conn *connection, remainingTime tim
989992
nerr := net.Error(nil)
990993
if l := int32(n); l == 0 && errors.As(err, &nerr) && nerr.Timeout() {
991994
pendingreadState.remainingBytes = l + pendingreadState.remainingBytes
992-
//prs.remainingTime = ptrutil.Ptr(*prs.remainingTime - time.Since(st))
993995
}
994996

995997
err = transformNetworkError(ctx, err, contextDeadlineUsed)
@@ -1017,22 +1019,17 @@ func awaitPendingRead(ctx context.Context, pool *pool, conn *connection) error {
10171019
publishPendingReadStarted(pool, conn)
10181020

10191021
var (
1020-
pendingReadState = conn.pendingReadState
1021-
remainingTime = pendingReadState.start.Add(PendingReadTimeout).Sub(time.Now())
1022-
err error
1023-
bytesRead int
1024-
contextDeadlineUsed bool
1022+
pendingReadState = conn.pendingReadState
1023+
remainingTime = pendingReadState.start.Add(PendingReadTimeout).Sub(time.Now())
1024+
err error
1025+
bytesRead int
10251026
)
10261027

10271028
if remainingTime <= 0 {
10281029
// If there is no remaining time, we can just peek at the connection to check
10291030
// aliveness. In such cases, we don't want to close the connection.
10301031
bytesRead, err = peekConnectionAlive(conn)
10311032
} else {
1032-
//pendingReadContext, cancel := newPendingReadContext(ctx, remainingTime)
1033-
//defer cancel()
1034-
1035-
contextDeadlineUsed = true
10361033
bytesRead, err = attemptPendingRead(ctx, conn, remainingTime)
10371034
}
10381035

@@ -1057,7 +1054,7 @@ func awaitPendingRead(ctx context.Context, pool *pool, conn *connection) error {
10571054
if err := conn.close(); err != nil {
10581055
return err
10591056
}
1060-
return transformNetworkError(ctx, err, contextDeadlineUsed)
1057+
return err
10611058
}
10621059
}
10631060

0 commit comments

Comments
 (0)