|
6 | 6 | "errors"
|
7 | 7 | "fmt"
|
8 | 8 | "net"
|
| 9 | + "sync" |
9 | 10 | "sync/atomic"
|
10 | 11 | "time"
|
11 | 12 |
|
@@ -33,6 +34,10 @@ type Conn struct {
|
33 | 34 | bw *bufio.Writer
|
34 | 35 | wr *proto.Writer
|
35 | 36 |
|
| 37 | + // Lightweight mutex to protect reader operations during handoff |
| 38 | + // Only used for the brief period during SetNetConn and HasBufferedData/PeekReplyTypeSafe |
| 39 | + readerMu sync.RWMutex |
| 40 | + |
36 | 41 | Inited bool
|
37 | 42 | pooled bool
|
38 | 43 | createdAt time.Time
|
@@ -319,7 +324,13 @@ func (cn *Conn) ExecuteInitConn(ctx context.Context) error {
|
319 | 324 | func (cn *Conn) SetNetConn(netConn net.Conn) {
|
320 | 325 | // Store the new connection atomically first (lock-free)
|
321 | 326 | cn.setNetConn(netConn)
|
| 327 | + |
| 328 | + // Protect reader reset operations to avoid data races |
| 329 | + // Use write lock since we're modifying the reader state |
| 330 | + cn.readerMu.Lock() |
322 | 331 | cn.rd.Reset(netConn)
|
| 332 | + cn.readerMu.Unlock() |
| 333 | + |
323 | 334 | cn.bw.Reset(netConn)
|
324 | 335 | }
|
325 | 336 |
|
@@ -393,20 +404,29 @@ func (cn *Conn) Rd() *proto.Reader {
|
393 | 404 | }
|
394 | 405 |
|
395 | 406 | // Reader returns the connection's proto reader for processing notifications
|
| 407 | +// Note: This method should be used carefully as it returns the raw reader. |
| 408 | +// For thread-safe operations, use HasBufferedData() and PeekReplyTypeSafe(). |
396 | 409 | func (cn *Conn) Reader() *proto.Reader {
|
397 | 410 | return cn.rd
|
398 | 411 | }
|
399 | 412 |
|
400 | 413 | // HasBufferedData safely checks if the connection has buffered data.
|
401 | 414 | // This method is used to avoid data races when checking for push notifications.
|
402 | 415 | func (cn *Conn) HasBufferedData() bool {
|
| 416 | + // Use read lock for concurrent access to reader state |
| 417 | + cn.readerMu.RLock() |
| 418 | + defer cn.readerMu.RUnlock() |
403 | 419 | return cn.rd.Buffered() > 0
|
404 | 420 | }
|
405 | 421 |
|
406 | 422 | // PeekReplyTypeSafe safely peeks at the reply type.
|
407 | 423 | // This method is used to avoid data races when checking for push notifications.
|
408 | 424 | func (cn *Conn) PeekReplyTypeSafe() (byte, error) {
|
409 |
| - if !cn.HasBufferedData() { |
| 425 | + // Use read lock for concurrent access to reader state |
| 426 | + cn.readerMu.RLock() |
| 427 | + defer cn.readerMu.RUnlock() |
| 428 | + |
| 429 | + if cn.rd.Buffered() <= 0 { |
410 | 430 | return 0, fmt.Errorf("redis: can't peek reply type, no data available")
|
411 | 431 | }
|
412 | 432 | return cn.rd.PeekReplyType()
|
|
0 commit comments