Skip to content

Commit 5c06377

Browse files
felippeduranFelippe Duránhspedro
authored
Fix some unique session kick failures preventing clients to bind new sessions (#452)
* Remove UID from session pool and clean subscriptions correctly when kicking user * Force close old session if kick fails when enforcing unique sessions * Wrap errors within agent kick to facilitate debugging * Add integration test case to validate unique sessions module basic behavior * Use correct deadline exceeded error to validate write timeout * Fix SessionCount being decremented if session is already closed and Close is called again * Force close old session if kick fails due to 'use of closed connection' when enforcing unique sessions * Improve logs * Rename new unique session private method * Add e2e test to validate unique session doesn't fail if kick fails * Add comment to propose future improvement * chore: update go to 1.23 * chore(ci): fix setup with goveralls deps * feat(session): propagate network errors on Kick Kick might fail due to encoding and verious network errors during write. Also, if Kick tries to write to a closed connection (fd) it would do so without timeout, defaulting to the OS timeouts in the order of minutes. New Kick behavior does: - Uses writeConnection that sets write deadlines - Propagate encode errors so client can decide to retry - Propagate network errors so client can decide to ignore Knowing the error is specially useful in unique_session module, which we need to ensure existing session is always closed, regardless of a successful Kick. --------- Co-authored-by: Felippe Durán <felippe.duran@wildlifestudios.com> Co-authored-by: Pedro Soares <pedro.soares@wildlifestudios.com>
1 parent e24f252 commit 5c06377

File tree

19 files changed

+634
-102
lines changed

19 files changed

+634
-102
lines changed

.github/workflows/tests.yaml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,9 @@ jobs:
4343
- name: Send coverage
4444
env:
4545
COVERALLS_TOKEN: ${{ secrets.GITHUB_TOKEN }}
46-
run: ~/go/bin/goveralls -coverprofile=coverprofile.out -service=github
46+
run: |
47+
export PATH=$PATH:$(go env GOPATH)/bin
48+
goveralls -coverprofile=coverprofile.out -service=github
4749
e2e-test-nats:
4850
name: Nats Test End to End
4951
runs-on: ubuntu-latest

Makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ init-submodules:
2929
@git submodule init
3030

3131
setup-ci:
32-
@go install github.com/mattn/goveralls
33-
@go get -u github.com/wadey/gocovmerge
32+
@go install github.com/mattn/goveralls@latest
33+
@go install github.com/wadey/gocovmerge@latest
3434

3535
setup-protobuf-macos:
3636
@brew install protobuf

agent/agent.go

Lines changed: 82 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"strings"
3131
"sync"
3232
"sync/atomic"
33+
"syscall"
3334
"time"
3435

3536
"github.com/topfreegames/pitaya/v2/config"
@@ -190,6 +191,7 @@ func newAgent(
190191
// initialize heartbeat and handshake data on first user connection
191192
serializerName := serializer.GetName()
192193

194+
// TODO: Remove this once.Do and move the validation somewhere else, maybe during pitaya initialization. The current approach makes tests interfere with each other quite easily.
193195
once.Do(func() {
194196
hbdEncode(heartbeatTime, packetEncoder, messageEncoder.IsCompressionEnabled(), serializerName)
195197
herdEncode(heartbeatTime, packetEncoder, messageEncoder.IsCompressionEnabled(), serializerName)
@@ -320,14 +322,22 @@ func (a *agentImpl) Push(route string, v interface{}) error {
320322
return errors.NewError(constants.ErrBrokenPipe, errors.ErrClientClosedRequest)
321323
}
322324

325+
logger := logger.Log.WithFields(map[string]interface{}{
326+
"type": "Push",
327+
"session_id": a.Session.ID(),
328+
"uid": a.Session.UID(),
329+
"route": route,
330+
})
331+
323332
switch d := v.(type) {
324333
case []byte:
325-
logger.Log.Debugf("Type=Push, ID=%d, UID=%s, Route=%s, Data=%dbytes",
326-
a.Session.ID(), a.Session.UID(), route, len(d))
334+
logger = logger.WithField("bytes", len(d))
327335
default:
328-
logger.Log.Debugf("Type=Push, ID=%d, UID=%s, Route=%s, Data=%+v",
329-
a.Session.ID(), a.Session.UID(), route, v)
336+
logger = logger.WithField("data", fmt.Sprintf("%+v", d))
330337
}
338+
339+
logger.Debugf("pushing message to session")
340+
331341
return a.send(pendingMessage{typ: message.Push, route: route, payload: v})
332342
}
333343

@@ -346,15 +356,22 @@ func (a *agentImpl) ResponseMID(ctx context.Context, mid uint, v interface{}, is
346356
return constants.ErrSessionOnNotify
347357
}
348358

359+
logger := logger.Log.WithFields(map[string]interface{}{
360+
"type": "Push",
361+
"session_id": a.Session.ID(),
362+
"uid": a.Session.UID(),
363+
"mid": mid,
364+
})
365+
349366
switch d := v.(type) {
350367
case []byte:
351-
logger.Log.Debugf("Type=Response, ID=%d, UID=%s, MID=%d, Data=%dbytes",
352-
a.Session.ID(), a.Session.UID(), mid, len(d))
368+
logger = logger.WithField("bytes", len(d))
353369
default:
354-
logger.Log.Infof("Type=Response, ID=%d, UID=%s, MID=%d, Data=%+v",
355-
a.Session.ID(), a.Session.UID(), mid, v)
370+
logger = logger.WithField("data", fmt.Sprintf("%+v", d))
356371
}
357372

373+
logger.Debugf("responding message to session")
374+
358375
return a.send(pendingMessage{ctx: ctx, typ: message.Response, mid: mid, payload: v, err: err})
359376
}
360377

@@ -368,8 +385,11 @@ func (a *agentImpl) Close() error {
368385
}
369386
a.SetStatus(constants.StatusClosed)
370387

371-
logger.Log.Debugf("Session closed, ID=%d, UID=%s, IP=%s",
372-
a.Session.ID(), a.Session.UID(), a.conn.RemoteAddr())
388+
logger.Log.WithFields(map[string]interface{}{
389+
"session_id": a.Session.ID(),
390+
"uid": a.Session.UID(),
391+
"remote_addr": a.conn.RemoteAddr().String(),
392+
}).Debugf("Session closed")
373393

374394
// prevent closing closed channel
375395
select {
@@ -408,10 +428,48 @@ func (a *agentImpl) Kick(ctx context.Context) error {
408428
// packet encode
409429
p, err := a.encoder.Encode(packet.Kick, nil)
410430
if err != nil {
411-
return err
431+
return fmt.Errorf("agent kick encoding failed: %w", err)
432+
}
433+
if err := a.writeToConnection(ctx, p); err != nil {
434+
// 1. Check for a closed connection (most likely scenario for a "dead connection")
435+
if e.Is(err, net.ErrClosed) {
436+
// Handle specifically: connection was already closed
437+
// This could mean the client disconnected before the kick.
438+
return errors.NewError(err, errors.ErrClientClosedRequest, map[string]string{
439+
"reason": "agent kick failed: connection already closed",
440+
})
441+
}
442+
443+
// 2. Check for a timeout (if you have write deadlines)
444+
if e.Is(err, os.ErrDeadlineExceeded) {
445+
// Handle specifically: write operation timed out
446+
return errors.NewError(err, errors.ErrRequestTimeout, map[string]string{
447+
"reason": "agent kick failed: write timeout",
448+
})
449+
}
450+
451+
// 3. Unwrap OpError to check for specific syscall errors if needed
452+
var opError *net.OpError
453+
if e.As(err, &opError) {
454+
if e.Is(opError.Err, syscall.EPIPE) {
455+
// Handle specifically: broken pipe (often means client disconnected)
456+
return errors.NewError(err, errors.ErrClosedRequest, map[string]string{
457+
"reason": "agent kick failed: write timeout",
458+
})
459+
}
460+
if e.Is(opError.Err, syscall.ECONNRESET) {
461+
// Handle specifically: connection reset by peer
462+
return errors.NewError(err, errors.ErrClientClosedRequest, map[string]string{
463+
"reason": "agent kick failed: connection reset by peer",
464+
})
465+
}
466+
}
467+
468+
return errors.NewError(err, errors.ErrClosedRequest, map[string]string{
469+
"reason": "agent kick message failed",
470+
})
412471
}
413-
_, err = a.conn.Write(p)
414-
return err
472+
return nil
415473
}
416474

417475
// SetLastAt sets the last at to now
@@ -428,7 +486,10 @@ func (a *agentImpl) SetStatus(state int32) {
428486
func (a *agentImpl) Handle() {
429487
defer func() {
430488
a.Close()
431-
logger.Log.Debugf("Session handle goroutine exit, SessionID=%d, UID=%s", a.Session.ID(), a.Session.UID())
489+
logger.Log.WithFields(map[string]interface{}{
490+
"session_id": a.Session.ID(),
491+
"uid": a.Session.UID(),
492+
}).Debugf("Session handle goroutine exit")
432493
}()
433494

434495
go a.write()
@@ -466,7 +527,13 @@ func (a *agentImpl) heartbeat() {
466527
case <-ticker.C:
467528
deadline := time.Now().Add(-2 * a.heartbeatTimeout).Unix()
468529
if atomic.LoadInt64(&a.lastAt) < deadline {
469-
logger.Log.Debugf("Session heartbeat timeout, LastTime=%d, Deadline=%d", atomic.LoadInt64(&a.lastAt), deadline)
530+
logger.Log.WithFields(map[string]interface{}{
531+
"session_id": a.Session.ID(),
532+
"uid": a.Session.UID(),
533+
"remote_addr": a.conn.RemoteAddr().String(),
534+
"last_at": atomic.LoadInt64(&a.lastAt),
535+
"deadline": deadline,
536+
}).Debugf("Session heartbeat timeout")
470537
return
471538
}
472539

0 commit comments

Comments
 (0)