diff --git a/agent/agent.go b/agent/agent.go index 6e6c61a1..32005448 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -40,6 +40,7 @@ import ( "github.com/topfreegames/pitaya/v2/constants" "github.com/topfreegames/pitaya/v2/errors" "github.com/topfreegames/pitaya/v2/logger" + "github.com/topfreegames/pitaya/v2/logger/interfaces" "github.com/topfreegames/pitaya/v2/metrics" "github.com/topfreegames/pitaya/v2/protos" "github.com/topfreegames/pitaya/v2/serialize" @@ -86,6 +87,7 @@ type ( metricsReporters []metrics.Reporter serializer serialize.Serializer // message serializer state int32 // current agent state + logger interfaces.Logger } pendingMessage struct { @@ -219,10 +221,11 @@ func newAgent( messageEncoder: messageEncoder, metricsReporters: metricsReporters, sessionPool: sessionPool, + logger: logger.Log, } - // binding session s := sessionPool.NewSession(a, true) + a.logger = a.logger.WithField("session_id", s.ID()) metrics.ReportNumberOfConnectedClients(metricsReporters, sessionPool.GetSessionCount()) a.Session = s return a @@ -322,22 +325,15 @@ func (a *agentImpl) Push(route string, v interface{}) error { return errors.NewError(constants.ErrBrokenPipe, errors.ErrClientClosedRequest) } - logger := logger.Log.WithFields(map[string]interface{}{ - "type": "Push", - "session_id": a.Session.ID(), - "uid": a.Session.UID(), - "route": route, - }) - switch d := v.(type) { case []byte: - logger = logger.WithField("bytes", len(d)) + a.logger.Debugf("Type=Push, ID=%d, UID=%s, Route=%s, Data=%dbytes", + a.Session.ID(), a.Session.UID(), route, len(d)) default: - logger = logger.WithField("data", fmt.Sprintf("%+v", d)) + a.logger.Debugf("Type=Push, ID=%d, UID=%s, Route=%s, Data=%+v", + a.Session.ID(), a.Session.UID(), route, v) } - logger.Debugf("pushing message to session") - return a.send(pendingMessage{typ: message.Push, route: route, payload: v}) } @@ -356,22 +352,15 @@ func (a *agentImpl) ResponseMID(ctx context.Context, mid uint, v interface{}, is return constants.ErrSessionOnNotify } - logger := logger.Log.WithFields(map[string]interface{}{ - "type": "Push", - "session_id": a.Session.ID(), - "uid": a.Session.UID(), - "mid": mid, - }) - switch d := v.(type) { case []byte: - logger = logger.WithField("bytes", len(d)) + a.logger.Debugf("Type=Response, ID=%d, UID=%s, MID=%d, Data=%dbytes", + a.Session.ID(), a.Session.UID(), mid, len(d)) default: - logger = logger.WithField("data", fmt.Sprintf("%+v", d)) + a.logger.Infof("Type=Response, ID=%d, UID=%s, MID=%d, Data=%+v", + a.Session.ID(), a.Session.UID(), mid, v) } - logger.Debugf("responding message to session") - return a.send(pendingMessage{ctx: ctx, typ: message.Response, mid: mid, payload: v, err: err}) } @@ -385,11 +374,8 @@ func (a *agentImpl) Close() error { } a.SetStatus(constants.StatusClosed) - logger.Log.WithFields(map[string]interface{}{ - "session_id": a.Session.ID(), - "uid": a.Session.UID(), - "remote_addr": a.conn.RemoteAddr().String(), - }).Debugf("Session closed") + a.logger.Debugf("Session closed, ID=%d, UID=%s, IP=%s", + a.Session.ID(), a.Session.UID(), a.conn.RemoteAddr()) // prevent closing closed channel select { @@ -475,10 +461,7 @@ func (a *agentImpl) SetStatus(state int32) { func (a *agentImpl) Handle() { defer func() { a.Close() - logger.Log.WithFields(map[string]interface{}{ - "session_id": a.Session.ID(), - "uid": a.Session.UID(), - }).Debugf("Session handle goroutine exit") + a.logger.Debugf("Session handle goroutine exit, SessionID=%d, UID=%s", a.Session.ID(), a.Session.UID()) }() go a.write() @@ -516,13 +499,7 @@ func (a *agentImpl) heartbeat() { case <-ticker.C: deadline := time.Now().Add(-2 * a.heartbeatTimeout).Unix() if atomic.LoadInt64(&a.lastAt) < deadline { - logger.Log.WithFields(map[string]interface{}{ - "session_id": a.Session.ID(), - "uid": a.Session.UID(), - "remote_addr": a.conn.RemoteAddr().String(), - "last_at": atomic.LoadInt64(&a.lastAt), - "deadline": deadline, - }).Debugf("Session heartbeat timeout") + a.logger.Debugf("Session heartbeat timeout, LastTime=%d, Deadline=%d", atomic.LoadInt64(&a.lastAt), deadline) return } @@ -545,7 +522,7 @@ func (a *agentImpl) heartbeat() { func (a *agentImpl) onSessionClosed(s session.Session) { defer func() { if err := recover(); err != nil { - logger.Log.Errorf("pitaya/onSessionClosed: %v", err) + a.logger.Errorf("pitaya/onSessionClosed: %v", err) } }() @@ -589,13 +566,13 @@ func (a *agentImpl) write() { if writeErr != nil { if e.Is(writeErr, os.ErrDeadlineExceeded) { // Log the timeout error but continue processing - logger.Log.Warnf( + a.logger.Warnf( "Context deadline exceeded for write in conn (%s) | session (%s): %s", a.conn.RemoteAddr(), a.Session.UID(), writeErr.Error(), ) } else { err = errors.NewError(writeErr, errors.ErrClosedRequest) - logger.Log.Errorf( + a.logger.Errorf( "Failed to write in conn (%s) | session (%s): %s, agent will close", a.conn.RemoteAddr(), a.Session.UID(), writeErr.Error(), ) @@ -670,12 +647,12 @@ func (a *agentImpl) AnswerWithError(ctx context.Context, mid uint, err error) { } p, e := util.GetErrorPayload(a.serializer, err) if e != nil { - logger.Log.Errorf("error answering the user with an error: %s", e.Error()) + a.logger.Errorf("error answering the user with an error: %s", e.Error()) return } e = a.Session.ResponseMID(ctx, mid, p, true) if e != nil { - logger.Log.Errorf("error answering the user with an error: %s", e.Error()) + a.logger.Errorf("error answering the user with an error: %s", e.Error()) } } @@ -748,14 +725,14 @@ func encodeAndCompress(data interface{}, dataCompression bool) ([]byte, error) { func (a *agentImpl) reportChannelSize() { chSendCapacity := a.messagesBufferSize - len(a.chSend) if chSendCapacity == 0 { - logger.Log.Warnf("chSend is at maximum capacity") + a.logger.Warnf("chSend is at maximum capacity") } for _, mr := range a.metricsReporters { if err := mr.ReportGauge(metrics.ChannelCapacity, map[string]string{"channel": "agent_chsend"}, float64(chSendCapacity)); err != nil { - logger.Log.Warnf("failed to report gauge chSend channel capacity: %s", err.Error()) + a.logger.Warnf("failed to report gauge chSend channel capacity: %s", err.Error()) } if err := mr.ReportHistogram(metrics.ChannelCapacityHistogram, map[string]string{"channel": "agent_chsend"}, float64(chSendCapacity)); err != nil { - logger.Log.Warnf("failed to report histogram chSend channel capacity: %s", err.Error()) + a.logger.Warnf("failed to report histogram chSend channel capacity: %s", err.Error()) } } } diff --git a/agent/agent_test.go b/agent/agent_test.go index a2a9b343..370e4e65 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -47,6 +47,7 @@ import ( pcontext "github.com/topfreegames/pitaya/v2/context" e "github.com/topfreegames/pitaya/v2/errors" "github.com/topfreegames/pitaya/v2/helpers" + "github.com/topfreegames/pitaya/v2/logger" "github.com/topfreegames/pitaya/v2/metrics" metricsmocks "github.com/topfreegames/pitaya/v2/metrics/mocks" "github.com/topfreegames/pitaya/v2/mocks" @@ -385,6 +386,7 @@ func TestAgentSendSerializeErr(t *testing.T) { messageEncoder: messageEncoder, metricsReporters: mockMetricsReporters, Session: sessionPool.NewSession(nil, true), + logger: logger.Log, } ctx := getCtxWithRequestKeys() diff --git a/service/remote.go b/service/remote.go index 95bb3db4..920a354d 100644 --- a/service/remote.go +++ b/service/remote.go @@ -113,17 +113,21 @@ func (r *RemoteService) remoteProcess( route *route.Route, msg *message.Message, ) { + log := logger.Log.WithFields(map[string]interface{}{ + "route": route.String(), + "uid": a.GetSession().UID(), + }) res, err := r.remoteCall(ctx, server, protos.RPCType_Sys, route, a.GetSession(), msg) switch msg.Type { case message.Request: if err != nil { - logger.Log.Errorf("Failed to process remote server: %s", err.Error()) + log.Errorf("Failed to process remote server: %s", err.Error()) a.AnswerWithError(ctx, msg.ID, err) return } err := a.GetSession().ResponseMID(ctx, msg.ID, res.Data) if err != nil { - logger.Log.Errorf("Failed to respond to remote server: %s", err.Error()) + log.Errorf("Failed to respond to remote server: %s", err.Error()) a.AnswerWithError(ctx, msg.ID, err) } case message.Notify: @@ -132,7 +136,7 @@ func (r *RemoteService) remoteProcess( err = errors.New(res.Error.GetMsg()) } if err != nil { - logger.Log.Errorf("error while sending a notify to server: %s", err.Error()) + log.Errorf("error while sending a notify to server: %s", err.Error()) } } } diff --git a/service/remote_test.go b/service/remote_test.go index d6ab0fbd..9938c644 100644 --- a/service/remote_test.go +++ b/service/remote_test.go @@ -741,6 +741,7 @@ func TestRemoteServiceRemoteProcess(t *testing.T) { } else if expectedMsg.Type != message.Notify { mockSession.EXPECT().ResponseMID(ctx, expectedMsg.ID, gomock.Any()).Return(table.responseMIDErr) } + mockSession.EXPECT().UID() if table.responseMIDErr != nil { mockAgent.EXPECT().AnswerWithError(ctx, expectedMsg.ID, table.responseMIDErr) diff --git a/session/session.go b/session/session.go index b4433675..1e4863f3 100644 --- a/session/session.go +++ b/session/session.go @@ -34,6 +34,7 @@ import ( nats "github.com/nats-io/nats.go" "github.com/topfreegames/pitaya/v2/constants" "github.com/topfreegames/pitaya/v2/logger" + "github.com/topfreegames/pitaya/v2/logger/interfaces" "github.com/topfreegames/pitaya/v2/networkentity" "github.com/topfreegames/pitaya/v2/protos" ) @@ -101,6 +102,7 @@ type sessionImpl struct { Subscriptions []*nats.Subscription // subscription created on bind when using nats rpc server requestsInFlight ReqInFlight // whether the session is waiting from a response from a remote pool *sessionPoolImpl + logger interfaces.Logger // logger instance for this session } type ReqInFlight struct { @@ -192,13 +194,16 @@ func (pool *sessionPoolImpl) NewSession(entity networkentity.NetworkEntity, fron IsFrontend: frontend, pool: pool, requestsInFlight: ReqInFlight{m: make(map[string]string)}, + logger: logger.Log, } if frontend { pool.sessionsByID.Store(s.id, s) atomic.AddInt64(&pool.SessionCount, 1) } + s.logger = s.logger.WithField("session_id", s.id) if len(UID) > 0 { s.uid = UID[0] + s.logger = s.logger.WithField("uid", s.uid) } return s } @@ -435,6 +440,7 @@ func (s *sessionImpl) Bind(ctx context.Context, uid string) error { } if s.UID() != "" { + s.logger.Debugf("Error trying to bind UID %s. A UID is already bound in this session", uid) return constants.ErrSessionAlreadyBound } @@ -442,6 +448,7 @@ func (s *sessionImpl) Bind(ctx context.Context, uid string) error { for _, cb := range s.pool.sessionBindCallbacks { err := cb(ctx, s) if err != nil { + s.logger.Error("Error running session bind callback. Removing uid from session") s.uid = "" return err } @@ -451,6 +458,7 @@ func (s *sessionImpl) Bind(ctx context.Context, uid string) error { if s.IsFrontend { // If a session with the same UID already exists in this frontend server, close it if val, ok := s.pool.sessionsByUID.Load(uid); ok { + s.logger.Warn("A session for this UID %s already existed in this frontend, on session ID %v closing it", val.(Session).ID()) val.(Session).Close() } s.pool.sessionsByUID.Store(uid, s) @@ -459,7 +467,7 @@ func (s *sessionImpl) Bind(ctx context.Context, uid string) error { // is not the frontend server that received the user request err := s.bindInFront(ctx) if err != nil { - logger.Log.Error("error while trying to push session to front: ", err) + s.logger.Error("error while trying to push session to front: ", err) s.uid = "" return err } @@ -504,6 +512,7 @@ func (s *sessionImpl) Close() { if _, ok := s.pool.sessionsByID.LoadAndDelete(s.ID()); ok { atomic.AddInt64(&s.pool.SessionCount, -1) } + s.logger.Debug("Closing session") // Only remove session by UID if the session ID matches the one being closed. This avoids problems with removing a valid session after the user has already reconnected before this session's heartbeat times out if val, ok := s.pool.sessionsByUID.Load(s.UID()); ok { if (val.(Session)).ID() == s.ID() { @@ -516,9 +525,9 @@ func (s *sessionImpl) Close() { for _, sub := range s.Subscriptions { err := sub.Drain() if err != nil { - logger.Log.Errorf("error unsubscribing to user's messages channel: %s, this can cause performance and leak issues", err.Error()) + s.logger.Errorf("error unsubscribing to user's messages channel: %s, this can cause performance and leak issues", err.Error()) } else { - logger.Log.Debugf("successfully unsubscribed to user's %s messages channel", s.UID()) + s.logger.Debug("successfully unsubscribed to user's messages channel") } } } @@ -849,6 +858,7 @@ func (s *sessionImpl) ValidateHandshake(data *HandshakeData) error { } func (s *sessionImpl) sendRequestToFront(ctx context.Context, route string, includeData bool) error { + log := s.logger.WithField("route", route) sessionData := &protos.Session{ Id: s.frontendSessionID, Uid: s.uid, @@ -864,7 +874,7 @@ func (s *sessionImpl) sendRequestToFront(ctx context.Context, route string, incl if err != nil { return err } - logger.Log.Debugf("%s Got response: %+v", route, res) + log.Debugf("Got response: %+v", res) return nil } diff --git a/util/util.go b/util/util.go index be8bdba5..eec5073b 100644 --- a/util/util.go +++ b/util/util.go @@ -166,10 +166,10 @@ func CtxWithDefaultLogger(ctx context.Context, route, userID string) context.Con requestID := pcontext.GetFromPropagateCtx(ctx, constants.RequestIDKey) if rID, ok := requestID.(string); ok { if rID == "" { - requestID = nuid.New() + requestID = nuid.New().Next() } } else { - requestID = nuid.New() + requestID = nuid.New().Next() } defaultLogger := logger.Log.WithFields( map[string]interface{}{