Skip to content

Commit 14ca5ae

Browse files
Advance logic to meet TD
1 parent 67eef26 commit 14ca5ae

File tree

9 files changed

+119
-49
lines changed

9 files changed

+119
-49
lines changed

event/monitoring.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -108,9 +108,11 @@ type PoolEvent struct {
108108
Reason string `json:"reason"`
109109
// ServiceID is only set if the Type is PoolCleared and the server is deployed behind a load balancer. This field
110110
// can be used to distinguish between individual servers in a load balanced deployment.
111-
ServiceID *bson.ObjectID `json:"serviceId"`
112-
Interruption bool `json:"interruptInUseConnections"`
113-
Error error `json:"error"`
111+
ServiceID *bson.ObjectID `json:"serviceId"`
112+
Interruption bool `json:"interruptInUseConnections"`
113+
Error error `json:"error"`
114+
RequestID int32 `json:"requestId"`
115+
RemainingTime time.Duration `json:"remainingTime"`
114116
}
115117

116118
// PoolMonitor is a function that allows the user to gain access to events occurring in the pool

internal/logger/component.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ const (
2828
ConnectionCheckoutFailed = "Connection checkout failed"
2929
ConnectionCheckedOut = "Connection checked out"
3030
ConnectionCheckedIn = "Connection checked in"
31+
ConnectionPendingReadStarted = "Pending read started"
32+
ConnectionPendingReadSucceeded = "Pending read succeeded"
33+
ConnectionPendingReadFailed = "Pending read failed"
3134
ServerSelectionFailed = "Server selection failed"
3235
ServerSelectionStarted = "Server selection started"
3336
ServerSelectionSucceeded = "Server selection succeeded"

x/mongo/driver/drivertest/channel_conn.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"go.mongodb.org/mongo-driver/v2/mongo/address"
1414
"go.mongodb.org/mongo-driver/v2/x/bsonx/bsoncore"
1515
"go.mongodb.org/mongo-driver/v2/x/mongo/driver/description"
16+
"go.mongodb.org/mongo-driver/v2/x/mongo/driver/mnet"
1617
"go.mongodb.org/mongo-driver/v2/x/mongo/driver/wiremessage"
1718
)
1819

@@ -52,7 +53,7 @@ func (c *ChannelConn) Write(ctx context.Context, wm []byte) error {
5253
}
5354

5455
// ReadWireMessage implements the driver.Connection interface.
55-
func (c *ChannelConn) Read(ctx context.Context) ([]byte, error) {
56+
func (c *ChannelConn) Read(ctx context.Context, _ ...mnet.ReadOption) ([]byte, error) {
5657
var wm []byte
5758
var err error
5859
select {

x/mongo/driver/drivertest/opmsg_deployment.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ func (c *connection) SetOIDCTokenGenID(uint64) {
6868
}
6969

7070
// Read returns the next response in the connection's list of responses.
71-
func (c *connection) Read(_ context.Context) ([]byte, error) {
71+
func (c *connection) Read(_ context.Context, _ ...mnet.ReadOption) ([]byte, error) {
7272
var dst []byte
7373
if len(c.responses) == 0 {
7474
return dst, errors.New("no responses remaining")

x/mongo/driver/mnet/connection.go

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,29 @@ import (
1414
"go.mongodb.org/mongo-driver/v2/x/mongo/driver/description"
1515
)
1616

17+
type ReadOption func(*ReadOptions)
18+
19+
type ReadOptions struct {
20+
HasMaxTimeMS bool
21+
RequestID int32
22+
}
23+
24+
func WithReadMaxTimeMS() ReadOption {
25+
return func(opts *ReadOptions) {
26+
opts.HasMaxTimeMS = true
27+
}
28+
}
29+
30+
func WithRequestID(requestID int32) ReadOption {
31+
return func(opts *ReadOptions) {
32+
opts.RequestID = requestID
33+
}
34+
}
35+
1736
// ReadWriteCloser represents a Connection where server operations
1837
// can read from, written to, and closed.
1938
type ReadWriteCloser interface {
20-
Read(ctx context.Context) ([]byte, error)
39+
Read(ctx context.Context, opts ...ReadOption) ([]byte, error)
2140
Write(ctx context.Context, wm []byte) error
2241
io.Closer
2342
}

x/mongo/driver/operation.go

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -792,7 +792,18 @@ func (op Operation) Execute(ctx context.Context) error {
792792
if moreToCome {
793793
roundTrip = op.moreToComeRoundTrip
794794
}
795-
res, err = roundTrip(ctx, conn, *wm)
795+
796+
readOpts := []mnet.ReadOption{}
797+
if maxTimeMS != 0 {
798+
readOpts = append(readOpts, mnet.WithReadMaxTimeMS())
799+
readOpts = append(readOpts, mnet.WithRequestID(startedInfo.requestID))
800+
}
801+
802+
// Inform the roundTrip if maxTimeMS is set. If it is and the operation
803+
// times out, then the connection should be put into a "pending" state
804+
// so that the next time it is checked out it attempts to finish the read
805+
// which is almost certainly a server error noting a timeout.
806+
res, err = roundTrip(ctx, conn, *wm, readOpts)
796807

797808
if ep, ok := srvr.(ErrorProcessor); ok {
798809
_ = ep.ProcessError(err, conn)
@@ -1076,16 +1087,25 @@ func (op Operation) retryable(desc description.Server) bool {
10761087

10771088
// roundTrip writes a wiremessage to the connection and then reads a wiremessage. The wm parameter
10781089
// is reused when reading the wiremessage.
1079-
func (op Operation) roundTrip(ctx context.Context, conn *mnet.Connection, wm []byte) ([]byte, error) {
1090+
func (op Operation) roundTrip(
1091+
ctx context.Context,
1092+
conn *mnet.Connection,
1093+
wm []byte,
1094+
readOpts []mnet.ReadOption,
1095+
) ([]byte, error) {
10801096
err := conn.Write(ctx, wm)
10811097
if err != nil {
10821098
return nil, op.networkError(err)
10831099
}
1084-
return op.readWireMessage(ctx, conn)
1100+
return op.readWireMessage(ctx, conn, readOpts...)
10851101
}
10861102

1087-
func (op Operation) readWireMessage(ctx context.Context, conn *mnet.Connection) (result []byte, err error) {
1088-
wm, err := conn.Read(ctx)
1103+
func (op Operation) readWireMessage(
1104+
ctx context.Context,
1105+
conn *mnet.Connection,
1106+
opts ...mnet.ReadOption,
1107+
) (result []byte, err error) {
1108+
wm, err := conn.Read(ctx, opts...)
10891109
if err != nil {
10901110
return nil, op.networkError(err)
10911111
}
@@ -1156,7 +1176,12 @@ func (op Operation) networkError(err error) error {
11561176

11571177
// moreToComeRoundTrip writes a wiremessage to the provided connection. This is used when an OP_MSG is
11581178
// being sent with the moreToCome bit set.
1159-
func (op *Operation) moreToComeRoundTrip(ctx context.Context, conn *mnet.Connection, wm []byte) (result []byte, err error) {
1179+
func (op *Operation) moreToComeRoundTrip(
1180+
ctx context.Context,
1181+
conn *mnet.Connection,
1182+
wm []byte,
1183+
_ []mnet.ReadOption,
1184+
) (result []byte, err error) {
11601185
err = conn.Write(ctx, wm)
11611186
if err != nil {
11621187
if op.Client != nil {

x/mongo/driver/operation_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -793,7 +793,7 @@ func (m *mockConnection) Write(_ context.Context, wm []byte) error {
793793
return m.rWriteErr
794794
}
795795

796-
func (m *mockConnection) Read(_ context.Context) ([]byte, error) {
796+
func (m *mockConnection) Read(_ context.Context, _ ...mnet.ReadOption) ([]byte, error) {
797797
return m.rReadWM, m.rReadErr
798798
}
799799

x/mongo/driver/topology/connection.go

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ type connection struct {
8686
// awaitRemainingBytes indicates the size of server response that was not completely
8787
// read before returning the connection to the pool.
8888
awaitRemainingBytes *int32
89+
requestID int32
8990
remainingTime *time.Duration
9091
pendingReadMU sync.Mutex
9192
}
@@ -396,7 +397,7 @@ func (c *connection) write(ctx context.Context, wm []byte) (err error) {
396397
}
397398

398399
// readWireMessage reads a wiremessage from the connection. The dst parameter will be overwritten.
399-
func (c *connection) readWireMessage(ctx context.Context) ([]byte, error) {
400+
func (c *connection) readWireMessage(ctx context.Context, opts ...mnet.ReadOption) ([]byte, error) {
400401
if atomic.LoadInt64(&c.state) != connConnected {
401402
return nil, ConnectionError{
402403
ConnectionID: c.id,
@@ -409,7 +410,7 @@ func (c *connection) readWireMessage(ctx context.Context) ([]byte, error) {
409410
return nil, ConnectionError{ConnectionID: c.id, Wrapped: err, message: "failed to set read deadline"}
410411
}
411412

412-
dst, errMsg, err := c.read(ctx)
413+
dst, errMsg, err := c.read(ctx, opts...)
413414
if err != nil {
414415
c.pendingReadMU.Lock()
415416
if c.awaitRemainingBytes == nil {
@@ -452,7 +453,7 @@ func (c *connection) parseWmSizeBytes(wmSizeBytes [4]byte) (int32, error) {
452453
return size, nil
453454
}
454455

455-
func (c *connection) read(ctx context.Context) (bytesRead []byte, errMsg string, err error) {
456+
func (c *connection) read(ctx context.Context, opts ...mnet.ReadOption) (bytesRead []byte, errMsg string, err error) {
456457
go c.cancellationListener.Listen(ctx, c.cancellationListenerCallback)
457458
defer func() {
458459
// If the context is cancelled after we finish reading the server response, the cancellation listener could fire
@@ -465,6 +466,11 @@ func (c *connection) read(ctx context.Context) (bytesRead []byte, errMsg string,
465466
}
466467
}()
467468

469+
readOpts := mnet.ReadOptions{}
470+
for _, opt := range opts {
471+
opt(&readOpts)
472+
}
473+
468474
isCSOTTimeout := func(err error) bool {
469475
// If the error was a timeout error, instead of closing the
470476
// connection mark it as awaiting response so the pool can read the
@@ -482,9 +488,10 @@ func (c *connection) read(ctx context.Context) (bytesRead []byte, errMsg string,
482488
// reading messages from an exhaust cursor.
483489
n, err := io.ReadFull(c.nc, sizeBuf[:])
484490
if err != nil {
485-
if l := int32(n); l == 0 && isCSOTTimeout(err) {
491+
if l := int32(n); l == 0 && isCSOTTimeout(err) && readOpts.HasMaxTimeMS {
486492
c.pendingReadMU.Lock()
487493
c.awaitRemainingBytes = &l
494+
c.requestID = readOpts.RequestID
488495
c.remainingTime = ptrutil.Ptr(PendingReadTimeout)
489496
c.pendingReadMU.Unlock()
490497
}
@@ -501,9 +508,10 @@ func (c *connection) read(ctx context.Context) (bytesRead []byte, errMsg string,
501508
n, err = io.ReadFull(c.nc, dst[4:])
502509
if err != nil {
503510
remainingBytes := size - 4 - int32(n)
504-
if remainingBytes > 0 && isCSOTTimeout(err) {
511+
if remainingBytes > 0 && isCSOTTimeout(err) && readOpts.HasMaxTimeMS {
505512
c.pendingReadMU.Lock()
506513
c.awaitRemainingBytes = &remainingBytes
514+
c.requestID = readOpts.RequestID
507515
c.remainingTime = ptrutil.Ptr(PendingReadTimeout)
508516
c.pendingReadMU.Unlock()
509517
}
@@ -664,8 +672,8 @@ func (c initConnection) LocalAddress() address.Address {
664672
func (c initConnection) Write(ctx context.Context, wm []byte) error {
665673
return c.writeWireMessage(ctx, wm)
666674
}
667-
func (c initConnection) Read(ctx context.Context) ([]byte, error) {
668-
return c.readWireMessage(ctx)
675+
func (c initConnection) Read(ctx context.Context, opts ...mnet.ReadOption) ([]byte, error) {
676+
return c.readWireMessage(ctx, opts...)
669677
}
670678
func (c initConnection) SetStreaming(streaming bool) {
671679
c.setStreaming(streaming)
@@ -712,13 +720,13 @@ func (c *Connection) Write(ctx context.Context, wm []byte) error {
712720

713721
// ReadWireMessage handles reading a wire message from the underlying connection. The dst parameter
714722
// will be overwritten with the new wire message.
715-
func (c *Connection) Read(ctx context.Context) ([]byte, error) {
723+
func (c *Connection) Read(ctx context.Context, opts ...mnet.ReadOption) ([]byte, error) {
716724
c.mu.RLock()
717725
defer c.mu.RUnlock()
718726
if c.connection == nil {
719727
return nil, ErrConnectionClosed
720728
}
721-
return c.connection.readWireMessage(ctx)
729+
return c.connection.readWireMessage(ctx, opts...)
722730
}
723731

724732
// CompressWireMessage handles compressing the provided wire message using the underlying

x/mongo/driver/topology/pool.go

Lines changed: 39 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -791,7 +791,7 @@ func (p *pool) removeConnection(conn *connection, reason reason, err error) erro
791791
// Deprecated: PendingReadTimeout is intended for internal use only and may be
792792
// removed or modified at any time.
793793

794-
var PendingReadTimeout = 1 * time.Second
794+
var PendingReadTimeout = 400 * time.Millisecond
795795

796796
// awaitPendingRead sets a new read deadline on the provided connection and
797797
// tries to read any bytes returned by the server. If there are any errors, the
@@ -805,18 +805,32 @@ func awaitPendingRead(ctx context.Context, pool *pool, conn *connection) error {
805805
return nil
806806
}
807807

808-
//if pool.monitor != nil {
809-
// pool.monitor.Event(&event.PoolEvent{
810-
// Type: event.ConnectionPendingReadStarted,
811-
// ConnectionID: conn.driverConnectionID,
812-
// })
813-
//}
808+
if mustLogPoolMessage(pool) {
809+
keysAndValues := logger.KeyValues{
810+
logger.KeyDriverConnectionID, conn.driverConnectionID,
811+
logger.KeyRequestID, conn.requestID,
812+
}
813+
814+
logPoolMessage(pool, logger.ConnectionPendingReadStarted, keysAndValues...)
815+
}
814816

815817
size := *conn.awaitRemainingBytes
816818

817819
checkIn := false
820+
var someErr error
818821

819822
defer func() {
823+
if mustLogPoolMessage(pool) && someErr != nil {
824+
keysAndValues := logger.KeyValues{
825+
logger.KeyDriverConnectionID, conn.driverConnectionID,
826+
logger.KeyRequestID, conn.requestID,
827+
logger.KeyReason, someErr.Error(),
828+
logger.KeyRemainingTimeMS, *conn.remainingTime,
829+
}
830+
831+
logPoolMessage(pool, logger.ConnectionPendingReadFailed, keysAndValues...)
832+
}
833+
820834
// If we have exceeded the time limit, then close the connection.
821835
if conn.remainingTime != nil && *conn.remainingTime < 0 {
822836
if err := conn.close(); err != nil {
@@ -830,14 +844,6 @@ func awaitPendingRead(ctx context.Context, pool *pool, conn *connection) error {
830844
return
831845
}
832846

833-
//if pool.monitor != nil {
834-
// pool.monitor.Event(&event.PoolEvent{
835-
// Type: event.ConnectionPendingReadFailed,
836-
// ConnectionID: conn.driverConnectionID,
837-
// //Reason: readErr.Error(),
838-
// })
839-
//}
840-
841847
// No matter what happens, always check the connection back into the
842848
// pool, which will either make it available for other operations or
843849
// remove it from the pool if it was closed.
@@ -857,7 +863,10 @@ func awaitPendingRead(ctx context.Context, pool *pool, conn *connection) error {
857863
err := conn.nc.SetReadDeadline(dl)
858864
if err != nil {
859865
checkIn = true
860-
return fmt.Errorf("error setting a read deadline: %w", err)
866+
867+
someErr = fmt.Errorf("error setting a read deadline: %w", err)
868+
869+
return someErr
861870
}
862871

863872
st := time.Now()
@@ -869,15 +878,16 @@ func awaitPendingRead(ctx context.Context, pool *pool, conn *connection) error {
869878
checkIn = true
870879

871880
err = transformNetworkError(ctx, err, contextDeadlineUsed)
881+
someErr = fmt.Errorf("error reading the message size: %w", err)
872882

873-
return fmt.Errorf("error reading the message size: %w", err)
883+
return someErr
874884
}
875885
size, err = conn.parseWmSizeBytes(sizeBuf)
876886
if err != nil {
877887
checkIn = true
878-
err = transformNetworkError(ctx, err, contextDeadlineUsed)
888+
someErr = transformNetworkError(ctx, err, contextDeadlineUsed)
879889

880-
return err
890+
return someErr
881891
}
882892
size -= 4
883893
}
@@ -893,17 +903,19 @@ func awaitPendingRead(ctx context.Context, pool *pool, conn *connection) error {
893903
checkIn = true
894904

895905
err = transformNetworkError(ctx, err, contextDeadlineUsed)
906+
someErr = fmt.Errorf("error discarding %d byte message: %w", size, err)
896907

897-
return fmt.Errorf("error discarding %d byte message: %w", size, err)
908+
return someErr
898909
}
899910

900-
//if pool.monitor != nil {
901-
// pool.monitor.Event(&event.PoolEvent{
902-
// Type: event.ConnectionPendingReadSucceeded,
903-
// ConnectionID: conn.driverConnectionID,
904-
// //Reason: readErr.Error(),
905-
// })
906-
//}
911+
if mustLogPoolMessage(pool) {
912+
keysAndValues := logger.KeyValues{
913+
logger.KeyDriverConnectionID, conn.driverConnectionID,
914+
logger.KeyRequestID, conn.requestID,
915+
}
916+
917+
logPoolMessage(pool, logger.ConnectionPendingReadSucceeded, keysAndValues...)
918+
}
907919

908920
conn.awaitRemainingBytes = nil
909921
conn.remainingTime = nil

0 commit comments

Comments
 (0)