Skip to content

Commit 9460d8d

Browse files
Replace bg reads with fg reads
1 parent f132378 commit 9460d8d

File tree

8 files changed

+529
-392
lines changed

8 files changed

+529
-392
lines changed

event/monitoring.go

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -91,24 +91,29 @@ const (
9191

9292
// strings for pool command monitoring types
9393
const (
94-
PoolCreated = "ConnectionPoolCreated"
95-
PoolReady = "ConnectionPoolReady"
96-
PoolCleared = "ConnectionPoolCleared"
97-
PoolClosedEvent = "ConnectionPoolClosed"
98-
ConnectionCreated = "ConnectionCreated"
99-
ConnectionReady = "ConnectionReady"
100-
ConnectionClosed = "ConnectionClosed"
101-
GetStarted = "ConnectionCheckOutStarted"
102-
GetFailed = "ConnectionCheckOutFailed"
103-
GetSucceeded = "ConnectionCheckedOut"
104-
ConnectionReturned = "ConnectionCheckedIn"
94+
PoolCreated = "ConnectionPoolCreated"
95+
PoolReady = "ConnectionPoolReady"
96+
PoolCleared = "ConnectionPoolCleared"
97+
PoolClosedEvent = "ConnectionPoolClosed"
98+
ConnectionCreated = "ConnectionCreated"
99+
ConnectionReady = "ConnectionReady"
100+
ConnectionClosed = "ConnectionClosed"
101+
GetStarted = "ConnectionCheckOutStarted"
102+
GetFailed = "ConnectionCheckOutFailed"
103+
GetSucceeded = "ConnectionCheckedOut"
104+
ConnectionReturned = "ConnectionCheckedIn"
105+
ConnectionPendingReadStarted = "ConnectionPendingReadStarted"
106+
ConnectionPendingReadSucceeded = "ConnectionPendingReadSucceeded"
107+
ConnectionPendingReadFailed = "ConnectionPendingReadFailed"
105108
)
106109

107110
// MonitorPoolOptions contains pool options as formatted in pool events
108111
type MonitorPoolOptions struct {
109-
MaxPoolSize uint64 `json:"maxPoolSize"`
110-
MinPoolSize uint64 `json:"minPoolSize"`
111-
WaitQueueTimeoutMS uint64 `json:"maxIdleTimeMS"`
112+
MaxPoolSize uint64 `json:"maxPoolSize"`
113+
MinPoolSize uint64 `json:"minPoolSize"`
114+
WaitQueueTimeoutMS uint64 `json:"maxIdleTimeMS"`
115+
RequestID int32 `json:"requestId"`
116+
RemainingTime time.Duration `json:"remainingTime"`
112117
}
113118

114119
// PoolEvent contains all information summarizing a pool event

internal/driverutil/context.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package driverutil
2+
3+
import "context"
4+
5+
type ContextKey string
6+
7+
const (
8+
ContextKeyHasMaxTimeMS ContextKey = "hasMaxTimeMS"
9+
ContextKeyRequestID ContextKey = "requestID"
10+
)
11+
12+
func WithValueHasMaxTimeMS(parentCtx context.Context, val bool) context.Context {
13+
return context.WithValue(parentCtx, ContextKeyHasMaxTimeMS, val)
14+
}
15+
16+
func WithRequestID(parentCtx context.Context, requestID int32) context.Context {
17+
return context.WithValue(parentCtx, ContextKeyRequestID, requestID)
18+
}
19+
20+
func HasMaxTimeMS(ctx context.Context) bool {
21+
return ctx.Value(ContextKeyHasMaxTimeMS) != nil
22+
}
23+
24+
func GetRequestID(ctx context.Context) (int32, bool) {
25+
val, ok := ctx.Value(ContextKeyRequestID).(int32)
26+
27+
return val, ok
28+
}

internal/logger/component.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ const (
2828
ConnectionCheckoutFailed = "Connection checkout failed"
2929
ConnectionCheckedOut = "Connection checked out"
3030
ConnectionCheckedIn = "Connection checked in"
31+
ConnectionPendingReadStarted = "Pending read started"
32+
ConnectionPendingReadSucceeded = "Pending read succeeded"
33+
ConnectionPendingReadFailed = "Pending read failed"
3134
ServerSelectionFailed = "Server selection failed"
3235
ServerSelectionStarted = "Server selection started"
3336
ServerSelectionSucceeded = "Server selection succeeded"

internal/ptrutil/ptr.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
// Copyright (C) MongoDB, Inc. 2024-present.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License"); you may
4+
// not use this file except in compliance with the License. You may obtain
5+
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
6+
7+
package ptrutil
8+
9+
// Ptr will return the memory location of the given value.
10+
func Ptr[T any](val T) *T {
11+
return &val
12+
}

x/mongo/driver/operation.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -807,6 +807,12 @@ func (op Operation) Execute(ctx context.Context) error {
807807
if moreToCome {
808808
roundTrip = op.moreToComeRoundTrip
809809
}
810+
811+
if maxTimeMS != 0 {
812+
ctx = driverutil.WithValueHasMaxTimeMS(ctx, true)
813+
ctx = driverutil.WithRequestID(ctx, startedInfo.requestID)
814+
}
815+
810816
res, err = roundTrip(ctx, conn, *wm)
811817

812818
if ep, ok := srvr.(ErrorProcessor); ok {

x/mongo/driver/topology/connection.go

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"time"
2222

2323
"go.mongodb.org/mongo-driver/internal/csot"
24+
"go.mongodb.org/mongo-driver/internal/driverutil"
2425
"go.mongodb.org/mongo-driver/mongo/address"
2526
"go.mongodb.org/mongo-driver/mongo/description"
2627
"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
@@ -46,6 +47,12 @@ var (
4647

4748
func nextConnectionID() uint64 { return atomic.AddUint64(&globalConnectionID, 1) }
4849

50+
type pendingReadState struct {
51+
remainingBytes int32
52+
requestID int32
53+
remainingTime *time.Duration
54+
}
55+
4956
type connection struct {
5057
// state must be accessed using the atomic package and should be at the beginning of the struct.
5158
// - atomic bug: https://pkg.go.dev/sync/atomic#pkg-note-BUG
@@ -83,7 +90,8 @@ type connection struct {
8390

8491
// awaitRemainingBytes indicates the size of server response that was not completely
8592
// read before returning the connection to the pool.
86-
awaitRemainingBytes *int32
93+
// awaitRemainingBytes *int32
94+
pendingReadState *pendingReadState
8795

8896
// oidcTokenGenID is the monotonic generation ID for OIDC tokens, used to invalidate
8997
// accessTokens in the OIDC authenticator cache.
@@ -452,7 +460,7 @@ func (c *connection) readWireMessage(ctx context.Context) ([]byte, error) {
452460

453461
dst, errMsg, err := c.read(ctx)
454462
if err != nil {
455-
if c.awaitRemainingBytes == nil {
463+
if c.pendingReadState == nil {
456464
// If the connection was not marked as awaiting response, use the
457465
// pre-CSOT behavior and close the connection because we don't know
458466
// if there are other bytes left to read.
@@ -523,8 +531,13 @@ func (c *connection) read(ctx context.Context) (bytesRead []byte, errMsg string,
523531
// reading messages from an exhaust cursor.
524532
n, err := io.ReadFull(c.nc, sizeBuf[:])
525533
if err != nil {
526-
if l := int32(n); l == 0 && isCSOTTimeout(err) {
527-
c.awaitRemainingBytes = &l
534+
if l := int32(n); l == 0 && isCSOTTimeout(err) && driverutil.HasMaxTimeMS(ctx) {
535+
requestID, _ := driverutil.GetRequestID(ctx)
536+
537+
c.pendingReadState = &pendingReadState{
538+
remainingBytes: l,
539+
requestID: requestID,
540+
}
528541
}
529542
return nil, "incomplete read of message header", err
530543
}
@@ -539,8 +552,13 @@ func (c *connection) read(ctx context.Context) (bytesRead []byte, errMsg string,
539552
n, err = io.ReadFull(c.nc, dst[4:])
540553
if err != nil {
541554
remainingBytes := size - 4 - int32(n)
542-
if remainingBytes > 0 && isCSOTTimeout(err) {
543-
c.awaitRemainingBytes = &remainingBytes
555+
if remainingBytes > 0 && isCSOTTimeout(err) && driverutil.HasMaxTimeMS(ctx) {
556+
requestID, _ := driverutil.GetRequestID(ctx)
557+
558+
c.pendingReadState = &pendingReadState{
559+
remainingBytes: remainingBytes,
560+
requestID: requestID,
561+
}
544562
}
545563
return dst, "incomplete read of full message", err
546564
}

0 commit comments

Comments
 (0)