@@ -578,9 +578,10 @@ func (c initConnection) SupportsStreaming() bool {
578
578
}
579
579
580
580
// Connection implements the driver.Connection interface to allow reading and writing wire
581
- // messages and the driver.Expirable interface to allow expiring.
581
+ // messages and the driver.Expirable interface to allow expiring. It wraps an underlying
582
+ // topology.connection to make it more goroutine-safe and nil-safe.
582
583
type Connection struct {
583
- * connection
584
+ connection * connection
584
585
refCount int
585
586
cleanupPoolFn func ()
586
587
@@ -602,7 +603,7 @@ func (c *Connection) WriteWireMessage(ctx context.Context, wm []byte) error {
602
603
if c .connection == nil {
603
604
return ErrConnectionClosed
604
605
}
605
- return c .writeWireMessage (ctx , wm )
606
+ return c .connection . writeWireMessage (ctx , wm )
606
607
}
607
608
608
609
// ReadWireMessage handles reading a wire message from the underlying connection. The dst parameter
@@ -613,7 +614,7 @@ func (c *Connection) ReadWireMessage(ctx context.Context, dst []byte) ([]byte, e
613
614
if c .connection == nil {
614
615
return dst , ErrConnectionClosed
615
616
}
616
- return c .readWireMessage (ctx , dst )
617
+ return c .connection . readWireMessage (ctx , dst )
617
618
}
618
619
619
620
// CompressWireMessage handles compressing the provided wire message using the underlying
@@ -656,7 +657,7 @@ func (c *Connection) Description() description.Server {
656
657
if c .connection == nil {
657
658
return description.Server {}
658
659
}
659
- return c .desc
660
+ return c .connection . desc
660
661
}
661
662
662
663
// Close returns this connection to the connection pool. This method may not closeConnection the underlying
@@ -679,12 +680,12 @@ func (c *Connection) Expire() error {
679
680
return nil
680
681
}
681
682
682
- _ = c .close ()
683
+ _ = c .connection . close ()
683
684
return c .cleanupReferences ()
684
685
}
685
686
686
687
func (c * Connection ) cleanupReferences () error {
687
- err := c .pool .checkIn (c .connection )
688
+ err := c .connection . pool .checkIn (c .connection )
688
689
if c .cleanupPoolFn != nil {
689
690
c .cleanupPoolFn ()
690
691
c .cleanupPoolFn = nil
@@ -709,14 +710,22 @@ func (c *Connection) ID() string {
709
710
if c .connection == nil {
710
711
return "<closed>"
711
712
}
712
- return c .id
713
+ return c .connection .id
714
+ }
715
+
716
+ // ServerConnectionID returns the server connection ID of this connection.
717
+ func (c * Connection ) ServerConnectionID () * int32 {
718
+ if c .connection == nil {
719
+ return nil
720
+ }
721
+ return c .connection .serverConnectionID
713
722
}
714
723
715
724
// Stale returns if the connection is stale.
716
725
func (c * Connection ) Stale () bool {
717
726
c .mu .RLock ()
718
727
defer c .mu .RUnlock ()
719
- return c .pool .stale (c .connection )
728
+ return c .connection . pool .stale (c .connection )
720
729
}
721
730
722
731
// Address returns the address of this connection.
@@ -726,27 +735,27 @@ func (c *Connection) Address() address.Address {
726
735
if c .connection == nil {
727
736
return address .Address ("0.0.0.0" )
728
737
}
729
- return c .addr
738
+ return c .connection . addr
730
739
}
731
740
732
741
// LocalAddress returns the local address of the connection
733
742
func (c * Connection ) LocalAddress () address.Address {
734
743
c .mu .RLock ()
735
744
defer c .mu .RUnlock ()
736
- if c .connection == nil || c .nc == nil {
745
+ if c .connection == nil || c .connection . nc == nil {
737
746
return address .Address ("0.0.0.0" )
738
747
}
739
- return address .Address (c .nc .LocalAddr ().String ())
748
+ return address .Address (c .connection . nc .LocalAddr ().String ())
740
749
}
741
750
742
751
// PinToCursor updates this connection to reflect that it is pinned to a cursor.
743
752
func (c * Connection ) PinToCursor () error {
744
- return c .pin ("cursor" , c .pool .pinConnectionToCursor , c .pool .unpinConnectionFromCursor )
753
+ return c .pin ("cursor" , c .connection . pool .pinConnectionToCursor , c . connection .pool .unpinConnectionFromCursor )
745
754
}
746
755
747
756
// PinToTransaction updates this connection to reflect that it is pinned to a transaction.
748
757
func (c * Connection ) PinToTransaction () error {
749
- return c .pin ("transaction" , c .pool .pinConnectionToTransaction , c .pool .unpinConnectionFromTransaction )
758
+ return c .pin ("transaction" , c .connection . pool .pinConnectionToTransaction , c . connection .pool .unpinConnectionFromTransaction )
750
759
}
751
760
752
761
func (c * Connection ) pin (reason string , updatePoolFn , cleanupPoolFn func ()) error {
0 commit comments