diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 1cf39530..42420b49 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -20,7 +20,7 @@ jobs: - name: Set up Go uses: actions/setup-go@v5 with: - go-version: '1.24' + go-version: '1.25' - name: Download dependencies run: go mod download unit-test: @@ -33,7 +33,7 @@ jobs: - name: Set up Go uses: actions/setup-go@v5 with: - go-version: '1.24' + go-version: '1.25' - name: Setup dependencies env: GO111MODULE: auto @@ -54,7 +54,7 @@ jobs: - name: Set up Go uses: actions/setup-go@v5 with: - go-version: '1.24' + go-version: '1.25' - name: Run tests run: make e2e-test-nats e2e-test-grpc: @@ -67,6 +67,6 @@ jobs: - name: Set up Go uses: actions/setup-go@v5 with: - go-version: '1.24' + go-version: '1.25' - name: Run tests run: make e2e-test-grpc diff --git a/go.mod b/go.mod index ad417748..87c3b27c 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/topfreegames/pitaya/v3 -go 1.24.0 +go 1.25.4 require ( github.com/DataDog/datadog-go v4.8.3+incompatible @@ -108,7 +108,6 @@ require ( go.opentelemetry.io/otel/metric v1.28.0 // indirect go.opentelemetry.io/proto/otlp v1.3.1 // indirect go.uber.org/atomic v1.11.0 // indirect - go.uber.org/automaxprocs v1.6.0 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.24.0 // indirect golang.org/x/crypto v0.45.0 // indirect diff --git a/go.sum b/go.sum index 21d2c4c1..5056f3ab 100644 --- a/go.sum +++ b/go.sum @@ -131,8 +131,6 @@ github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/me github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= -github.com/golang-jwt/jwt/v4 v4.4.2 h1:rcc4lwaZgFMCZ5jxF9ABolDcIHdBytAFgqFPbSJQAYs= -github.com/golang-jwt/jwt/v4 v4.4.2/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= github.com/golang-jwt/jwt/v4 v4.5.2 h1:YtQM7lnr8iZ+j5q71MGKkNw9Mn7AjHM68uc9g5fXeUI= github.com/golang-jwt/jwt/v4 v4.5.2/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= @@ -237,8 +235,6 @@ github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/X github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= -github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= github.com/klauspost/compress v1.18.1 h1:bcSGx7UbpBqMChDtsF28Lw6v/G94LPrrbMbdC3JH2co= github.com/klauspost/compress v1.18.1/go.mod h1:ZQFFVG+MdnR0P+l6wpXgIL4NTtwiKIdBnrBd8Nrxr+0= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= @@ -265,8 +261,6 @@ github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWE github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= -github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q= -github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ= github.com/minio/highwayhash v1.0.4-0.20251030100505-070ab1a87a76 h1:KGuD/pM2JpL9FAYvBrnBBeENKZNh6eNtjqytV6TYjnk= github.com/minio/highwayhash v1.0.4-0.20251030100505-070ab1a87a76/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ= github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= @@ -278,20 +272,12 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= -github.com/nats-io/jwt/v2 v2.7.3 h1:6bNPK+FXgBeAqdj4cYQ0F8ViHRbi7woQLq4W29nUAzE= -github.com/nats-io/jwt/v2 v2.7.3/go.mod h1:GvkcbHhKquj3pkioy5put1wvPxs78UlZ7D/pY+BgZk4= github.com/nats-io/jwt/v2 v2.8.0 h1:K7uzyz50+yGZDO5o772eRE7atlcSEENpL7P+b74JV1g= github.com/nats-io/jwt/v2 v2.8.0/go.mod h1:me11pOkwObtcBNR8AiMrUbtVOUGkqYjMQZ6jnSdVUIA= -github.com/nats-io/nats-server/v2 v2.10.24 h1:KcqqQAD0ZZcG4yLxtvSFJY7CYKVYlnlWoAiVZ6i/IY4= -github.com/nats-io/nats-server/v2 v2.10.24/go.mod h1:olvKt8E5ZlnjyqBGbAXtxvSQKsPodISK5Eo/euIta4s= github.com/nats-io/nats-server/v2 v2.12.2 h1:4TEQd0Y4zvcW0IsVxjlXnRso1hBkQl3TS0BI+SxgPhE= github.com/nats-io/nats-server/v2 v2.12.2/go.mod h1:j1AAttYeu7WnvD8HLJ+WWKNMSyxsqmZ160pNtCQRMyE= -github.com/nats-io/nats.go v1.39.0 h1:2/yg2JQjiYYKLwDuBzV0FbB2sIV+eFNkEevlRi4n9lI= -github.com/nats-io/nats.go v1.39.0/go.mod h1:MgRb8oOdigA6cYpEPhXJuRVH6UE/V4jblJ2jQ27IXYM= github.com/nats-io/nats.go v1.47.0 h1:YQdADw6J/UfGUd2Oy6tn4Hq6YHxCaJrVKayxxFqYrgM= github.com/nats-io/nats.go v1.47.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g= -github.com/nats-io/nkeys v0.4.9 h1:qe9Faq2Gxwi6RZnZMXfmGMZkg3afLLOtrU+gDZJ35b0= -github.com/nats-io/nkeys v0.4.9/go.mod h1:jcMqs+FLG+W5YO36OX6wFIFcmpdAns+w1Wm6D3I/evE= github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0= github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= @@ -417,8 +403,6 @@ go.opentelemetry.io/proto/otlp v1.3.1/go.mod h1:0X1WI4de4ZsLrrJNLAQbFeLCm3T7yBkR go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= -go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs= -go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8= go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= @@ -597,8 +581,6 @@ golang.org/x/text v0.31.0/go.mod h1:tKRAlv61yKIjGGHX/4tP1LTbc13YSec1pxVEWXzfoeM= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= -golang.org/x/time v0.8.0 h1:9i3RxcPv3PZnitoVGMPDKZSq1xW1gK1Xy3ArNOGZfEg= -golang.org/x/time v0.8.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/time v0.14.0 h1:MRx4UaLrDotUKUdCIqzPC48t1Y9hANFKIRpNx+Te8PI= golang.org/x/time v0.14.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= @@ -746,8 +728,6 @@ google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTp google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc= google.golang.org/grpc v1.34.0/go.mod h1:WotjhfgOW/POjDeRt8vscBtXq+2VjORFy659qA51WJ8= google.golang.org/grpc v1.35.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= -google.golang.org/grpc v1.64.0 h1:KH3VH9y/MgNQg1dE7b3XfVK0GsPSIzJwdF617gUSbvY= -google.golang.org/grpc v1.64.0/go.mod h1:oxjF8E3FBnjp+/gVFYdWacaLDx9na1aqy9oovLpxQYg= google.golang.org/grpc v1.64.1 h1:LKtvyfbX3UGVPFcGqJ9ItpVWW6oN/2XqTxfAnwRRXiA= google.golang.org/grpc v1.64.1/go.mod h1:hiQF4LFZelK2WKaP6W0L92zGHtiQdZxk8CrSdvyjeP0= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= diff --git a/go.work.sum b/go.work.sum index 72bb0f0e..9f17961b 100644 --- a/go.work.sum +++ b/go.work.sum @@ -550,6 +550,7 @@ go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0/go.mod h1: go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.38.0/go.mod h1:GAXRxmLJcVM3u22IjTg74zWBrRCKq8BnOqUVLodpcpw= go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.38.0/go.mod h1:ZQM5lAJpOsKnYagGg/zV2krVqTtaVdYdDkhMoX6Oalg= go.opentelemetry.io/proto/otlp v1.7.1/go.mod h1:b2rVh6rfI/s2pHWNlB7ILJcRALpcNDzKhACevjI+ZnE= +go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8= go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU= go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg= golang.org/x/crypto v0.37.0/go.mod h1:vg+k43peMZ0pUMhYmVAWysMK35e6ioLh3wB8ZCAfbVc= diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index d8a02646..73219c63 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -39,6 +39,7 @@ import ( "github.com/topfreegames/pitaya/v3/pkg/constants" "github.com/topfreegames/pitaya/v3/pkg/errors" "github.com/topfreegames/pitaya/v3/pkg/logger" + "github.com/topfreegames/pitaya/v3/pkg/logger/interfaces" "github.com/topfreegames/pitaya/v3/pkg/metrics" "github.com/topfreegames/pitaya/v3/pkg/protos" "github.com/topfreegames/pitaya/v3/pkg/serialize" @@ -86,6 +87,7 @@ type ( metricsReporters []metrics.Reporter serializer serialize.Serializer // message serializer state int32 // current agent state + logger interfaces.Logger } pendingMessage struct { @@ -218,10 +220,11 @@ func newAgent( messageEncoder: messageEncoder, metricsReporters: metricsReporters, sessionPool: sessionPool, + logger: logger.Log, } - // binding session s := sessionPool.NewSession(a, true) + a.logger = a.logger.WithField("session_id", s.ID()) metrics.ReportNumberOfConnectedClients(metricsReporters, sessionPool.GetSessionCount()) a.Session = s return a @@ -323,10 +326,10 @@ func (a *agentImpl) Push(route string, v interface{}) error { switch d := v.(type) { case []byte: - logger.Log.Debugf("Type=Push, ID=%d, UID=%s, Route=%s, Data=%dbytes", + a.logger.Debugf("Type=Push, ID=%d, UID=%s, Route=%s, Data=%dbytes", a.Session.ID(), a.Session.UID(), route, len(d)) default: - logger.Log.Debugf("Type=Push, ID=%d, UID=%s, Route=%s, Data=%+v", + a.logger.Debugf("Type=Push, ID=%d, UID=%s, Route=%s, Data=%+v", a.Session.ID(), a.Session.UID(), route, v) } return a.send(pendingMessage{typ: message.Push, route: route, payload: v}) @@ -349,10 +352,10 @@ func (a *agentImpl) ResponseMID(ctx context.Context, mid uint, v interface{}, is switch d := v.(type) { case []byte: - logger.Log.Debugf("Type=Response, ID=%d, UID=%s, MID=%d, Data=%dbytes", + a.logger.Debugf("Type=Response, ID=%d, UID=%s, MID=%d, Data=%dbytes", a.Session.ID(), a.Session.UID(), mid, len(d)) default: - logger.Log.Infof("Type=Response, ID=%d, UID=%s, MID=%d, Data=%+v", + a.logger.Infof("Type=Response, ID=%d, UID=%s, MID=%d, Data=%+v", a.Session.ID(), a.Session.UID(), mid, v) } @@ -369,7 +372,7 @@ func (a *agentImpl) Close() error { } a.SetStatus(constants.StatusClosed) - logger.Log.Debugf("Session closed, ID=%d, UID=%s, IP=%s", + a.logger.Debugf("Session closed, ID=%d, UID=%s, IP=%s", a.Session.ID(), a.Session.UID(), a.conn.RemoteAddr()) // prevent closing closed channel @@ -429,7 +432,7 @@ func (a *agentImpl) SetStatus(state int32) { func (a *agentImpl) Handle() { defer func() { a.Close() - logger.Log.Debugf("Session handle goroutine exit, SessionID=%d, UID=%s", a.Session.ID(), a.Session.UID()) + a.logger.Debugf("Session handle goroutine exit, SessionID=%d, UID=%s", a.Session.ID(), a.Session.UID()) }() go a.write() @@ -467,7 +470,7 @@ func (a *agentImpl) heartbeat() { case <-ticker.C: deadline := time.Now().Add(-2 * a.heartbeatTimeout).Unix() if atomic.LoadInt64(&a.lastAt) < deadline { - logger.Log.Debugf("Session heartbeat timeout, LastTime=%d, Deadline=%d", atomic.LoadInt64(&a.lastAt), deadline) + a.logger.Debugf("Session heartbeat timeout, LastTime=%d, Deadline=%d", atomic.LoadInt64(&a.lastAt), deadline) return } @@ -490,7 +493,7 @@ func (a *agentImpl) heartbeat() { func (a *agentImpl) onSessionClosed(s session.Session) { defer func() { if err := recover(); err != nil { - logger.Log.Errorf("pitaya/onSessionClosed: %v", err) + a.logger.Errorf("pitaya/onSessionClosed: %v", err) } }() @@ -560,8 +563,8 @@ func (a *agentImpl) write() { func (a *agentImpl) writeToConnection(ctx context.Context, data []byte) error { span := createConnectionSpan(ctx, a.conn, "conn write") - a.conn.SetWriteDeadline(time.Now().Add(a.writeTimeout)) - _, writeErr := a.conn.Write(data) + a.conn.SetWriteDeadline(time.Now().Add(a.writeTimeout)) + _, writeErr := a.conn.Write(data) if span != nil { defer span.End() @@ -617,12 +620,12 @@ func (a *agentImpl) AnswerWithError(ctx context.Context, mid uint, err error) { } p, e := util.GetErrorPayload(a.serializer, err) if e != nil { - logger.Log.Errorf("error answering the user with an error: %s", e.Error()) + a.logger.Errorf("error answering the user with an error: %s", e.Error()) return } e = a.Session.ResponseMID(ctx, mid, p, true) if e != nil { - logger.Log.Errorf("error answering the user with an error: %s", e.Error()) + a.logger.Errorf("error answering the user with an error: %s", e.Error()) } } @@ -695,7 +698,7 @@ func encodeAndCompress(data interface{}, dataCompression bool) ([]byte, error) { func (a *agentImpl) reportChannelSize() { chSendCapacity := a.messagesBufferSize - len(a.chSend) if chSendCapacity == 0 { - logger.Log.Warnf("chSend is at maximum capacity") + a.logger.Warnf("chSend is at maximum capacity") } for _, mr := range a.metricsReporters { if err := mr.ReportHistogram(metrics.ChannelCapacity, map[string]string{"channel": "agent_chsend"}, float64(chSendCapacity)); err != nil { diff --git a/pkg/agent/agent_test.go b/pkg/agent/agent_test.go index 97ac99e5..10a54c7d 100644 --- a/pkg/agent/agent_test.go +++ b/pkg/agent/agent_test.go @@ -45,6 +45,7 @@ import ( pcontext "github.com/topfreegames/pitaya/v3/pkg/context" e "github.com/topfreegames/pitaya/v3/pkg/errors" "github.com/topfreegames/pitaya/v3/pkg/helpers" + "github.com/topfreegames/pitaya/v3/pkg/logger" "github.com/topfreegames/pitaya/v3/pkg/metrics" metricsmocks "github.com/topfreegames/pitaya/v3/pkg/metrics/mocks" "github.com/topfreegames/pitaya/v3/pkg/mocks" @@ -227,13 +228,19 @@ func TestAgentSendSerializeErr(t *testing.T) { ag := &agentImpl{ // avoid heartbeat and handshake to fully test serialize conn: mockConn, chSend: make(chan pendingWrite, 10), + chDie: make(chan struct{}), + chStopWrite: make(chan struct{}), + chStopHeartbeat: make(chan struct{}), encoder: mockEncoder, heartbeatTimeout: time.Second, + writeTimeout: time.Second, lastAt: time.Now().Unix(), serializer: mockSerializer, messageEncoder: messageEncoder, metricsReporters: mockMetricsReporters, Session: sessionPool.NewSession(nil, true), + logger: logger.Log, + sessionPool: sessionPool, } ctx := getCtxWithRequestKeys() @@ -269,15 +276,19 @@ func TestAgentSendSerializeErr(t *testing.T) { var wg sync.WaitGroup wg.Add(1) - mockConn.EXPECT().RemoteAddr().Times(2).Return(&mockAddr{}) + mockConn.EXPECT().RemoteAddr().AnyTimes().Return(&mockAddr{}) mockConn.EXPECT().SetWriteDeadline(gomock.Any()).Return(nil) mockConn.EXPECT().Write(expectedPacket).Do(func(b []byte) { wg.Done() }) + mockConn.EXPECT().Close().Return(nil) go ag.write() mockMetricsReporter.EXPECT().ReportHistogram(gomock.Any(), gomock.Any(), gomock.Any()) + mockMetricsReporter.EXPECT().ReportGauge(gomock.Any(), gomock.Any(), gomock.Any()) ag.send(expected) wg.Wait() + // Close the agent to properly clean up the write goroutine + ag.Close() } diff --git a/pkg/service/remote.go b/pkg/service/remote.go index d89b15c3..c8c44127 100644 --- a/pkg/service/remote.go +++ b/pkg/service/remote.go @@ -112,17 +112,21 @@ func (r *RemoteService) remoteProcess( route *route.Route, msg *message.Message, ) { + log := logger.Log.WithFields(map[string]interface{}{ + "route": route.String(), + "uid": a.GetSession().UID(), + }) res, err := r.remoteCall(ctx, server, protos.RPCType_Sys, route, a.GetSession(), msg) switch msg.Type { case message.Request: if err != nil { - logger.Log.Errorf("Failed to process remote server: %s", err.Error()) + log.Errorf("Failed to process remote server: %s", err.Error()) a.AnswerWithError(ctx, msg.ID, err) return } err := a.GetSession().ResponseMID(ctx, msg.ID, res.Data) if err != nil { - logger.Log.Errorf("Failed to respond to remote server: %s", err.Error()) + log.Errorf("Failed to respond to remote server: %s", err.Error()) a.AnswerWithError(ctx, msg.ID, err) } case message.Notify: @@ -131,7 +135,7 @@ func (r *RemoteService) remoteProcess( err = errors.New(res.Error.GetMsg()) } if err != nil { - logger.Log.Errorf("error while sending a notify to server: %s", err.Error()) + log.Errorf("error while sending a notify to server: %s", err.Error()) } } } diff --git a/pkg/service/remote_test.go b/pkg/service/remote_test.go index 0f8d958d..ade184b4 100644 --- a/pkg/service/remote_test.go +++ b/pkg/service/remote_test.go @@ -741,6 +741,7 @@ func TestRemoteServiceRemoteProcess(t *testing.T) { } else if expectedMsg.Type != message.Notify { mockSession.EXPECT().ResponseMID(ctx, expectedMsg.ID, gomock.Any()).Return(table.responseMIDErr) } + mockSession.EXPECT().UID() if table.responseMIDErr != nil { mockAgent.EXPECT().AnswerWithError(ctx, expectedMsg.ID, table.responseMIDErr) diff --git a/pkg/session/session.go b/pkg/session/session.go index 8cc82952..acd5fa68 100644 --- a/pkg/session/session.go +++ b/pkg/session/session.go @@ -34,6 +34,7 @@ import ( nats "github.com/nats-io/nats.go" "github.com/topfreegames/pitaya/v3/pkg/constants" "github.com/topfreegames/pitaya/v3/pkg/logger" + "github.com/topfreegames/pitaya/v3/pkg/logger/interfaces" "github.com/topfreegames/pitaya/v3/pkg/networkentity" "github.com/topfreegames/pitaya/v3/pkg/protos" ) @@ -101,6 +102,7 @@ type sessionImpl struct { Subscriptions []*nats.Subscription // subscription created on bind when using nats rpc server requestsInFlight ReqInFlight // whether the session is waiting from a response from a remote pool *sessionPoolImpl + logger interfaces.Logger // logger instance for this session } type ReqInFlight struct { @@ -192,13 +194,16 @@ func (pool *sessionPoolImpl) NewSession(entity networkentity.NetworkEntity, fron IsFrontend: frontend, pool: pool, requestsInFlight: ReqInFlight{m: make(map[string]string)}, + logger: logger.Log, } if frontend { pool.sessionsByID.Store(s.id, s) atomic.AddInt64(&pool.SessionCount, 1) } + s.logger = s.logger.WithField("session_id",s.id) if len(UID) > 0 { s.uid = UID[0] + s.logger = s.logger.WithField("uid",s.uid) } return s } @@ -435,6 +440,7 @@ func (s *sessionImpl) Bind(ctx context.Context, uid string) error { } if s.UID() != "" { + s.logger.Debugf("Error trying to bind UID %s. A UID is already bound in this session", uid) return constants.ErrSessionAlreadyBound } @@ -442,6 +448,7 @@ func (s *sessionImpl) Bind(ctx context.Context, uid string) error { for _, cb := range s.pool.sessionBindCallbacks { err := cb(ctx, s) if err != nil { + s.logger.Error("Error running session bind callback. Removing uid from session") s.uid = "" return err } @@ -451,6 +458,7 @@ func (s *sessionImpl) Bind(ctx context.Context, uid string) error { if s.IsFrontend { // If a session with the same UID already exists in this frontend server, close it if val, ok := s.pool.sessionsByUID.Load(uid); ok { + s.logger.Warn("A session for this UID %s already existed in this frontend, on session ID %v closing it", val.(Session).ID()) val.(Session).Close() } s.pool.sessionsByUID.Store(uid, s) @@ -459,7 +467,7 @@ func (s *sessionImpl) Bind(ctx context.Context, uid string) error { // is not the frontend server that received the user request err := s.bindInFront(ctx) if err != nil { - logger.Log.Error("error while trying to push session to front: ", err) + s.logger.Error("error while trying to push session to front: ", err) s.uid = "" return err } @@ -501,6 +509,7 @@ func (s *sessionImpl) OnClose(c func()) error { func (s *sessionImpl) Close() { atomic.AddInt64(&s.pool.SessionCount, -1) s.pool.sessionsByID.Delete(s.ID()) + s.logger.Debug("Closing session") // Only remove session by UID if the session ID matches the one being closed. This avoids problems with removing a valid session after the user has already reconnected before this session's heartbeat times out if val, ok := s.pool.sessionsByUID.Load(s.UID()); ok { if (val.(Session)).ID() == s.ID() { @@ -513,9 +522,9 @@ func (s *sessionImpl) Close() { for _, sub := range s.Subscriptions { err := sub.Drain() if err != nil { - logger.Log.Errorf("error unsubscribing to user's messages channel: %s, this can cause performance and leak issues", err.Error()) + s.logger.Errorf("error unsubscribing to user's messages channel: %s, this can cause performance and leak issues", err.Error()) } else { - logger.Log.Debugf("successfully unsubscribed to user's %s messages channel", s.UID()) + s.logger.Debug("successfully unsubscribed to user's messages channel") } } } @@ -846,6 +855,7 @@ func (s *sessionImpl) ValidateHandshake(data *HandshakeData) error { } func (s *sessionImpl) sendRequestToFront(ctx context.Context, route string, includeData bool) error { + log := s.logger.WithField("route", route) sessionData := &protos.Session{ Id: s.frontendSessionID, Uid: s.uid, @@ -861,7 +871,7 @@ func (s *sessionImpl) sendRequestToFront(ctx context.Context, route string, incl if err != nil { return err } - logger.Log.Debugf("%s Got response: %+v", route, res) + log.Debugf("Got response: %+v", res) return nil } diff --git a/pkg/util/util.go b/pkg/util/util.go index 6e5def55..bbcdf572 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -167,10 +167,10 @@ func CtxWithDefaultLogger(ctx context.Context, route, userID string) context.Con requestID := pcontext.GetFromPropagateCtx(ctx, constants.RequestIDKey) if rID, ok := requestID.(string); ok { if rID == "" { - requestID = nuid.New() + requestID = nuid.New().Next() } } else { - requestID = nuid.New() + requestID = nuid.New().Next() } defaultLogger := logger.Log.WithFields( map[string]interface{}{