diff --git a/main.go b/main.go index 5a3974d14..7e6c9bfd5 100644 --- a/main.go +++ b/main.go @@ -218,7 +218,7 @@ func main() { leaderboardScheduler.Start(runtime) - pipeline := server.NewPipeline(logger, config, db, jsonpbMarshaler, jsonpbUnmarshaler, sessionRegistry, statusRegistry, matchRegistry, partyRegistry, matchmaker, tracker, router, runtime) + pipeline := server.NewPipeline(logger, config, db, jsonpbMarshaler, jsonpbUnmarshaler, sessionRegistry, statusRegistry, matchRegistry, partyRegistry, matchmaker, tracker, router, runtime, metrics) statusHandler := server.NewLocalStatusHandler(logger, sessionRegistry, matchRegistry, partyRegistry, tracker, metrics, config.GetName(), createTime) telemetryEnabled := os.Getenv("NAKAMA_TELEMETRY") != "0" diff --git a/server/api_leaderboard_test.go b/server/api_leaderboard_test.go index 256d1c92e..ed618c150 100644 --- a/server/api_leaderboard_test.go +++ b/server/api_leaderboard_test.go @@ -113,7 +113,7 @@ nk.leaderboard_create(%q, %t, %q, %q, reset, metadata, %t) tracker := &LocalTracker{} sessionCache := NewLocalSessionCache(1_000, 3_600) - pipeline := NewPipeline(logger, cfg, db, protojsonMarshaler, protojsonUnmarshaler, nil, nil, nil, nil, nil, tracker, router, runtime) + pipeline := NewPipeline(logger, cfg, db, protojsonMarshaler, protojsonUnmarshaler, nil, nil, nil, nil, nil, tracker, router, runtime, metrics) apiServer := StartApiServer(logger, logger, db, protojsonMarshaler, protojsonUnmarshaler, cfg, "3.0.0", nil, nil, rtData.leaderboardCache, diff --git a/server/api_test.go b/server/api_test.go index fda6d6e09..5db008f4e 100644 --- a/server/api_test.go +++ b/server/api_test.go @@ -226,7 +226,7 @@ func NewAPIServer(t *testing.T, runtime *Runtime) (*ApiServer, *Pipeline) { sessionCache := NewLocalSessionCache(3_600, 7_200) sessionRegistry := NewLocalSessionRegistry(metrics) tracker := &LocalTracker{sessionRegistry: sessionRegistry} - pipeline := NewPipeline(logger, cfg, db, protojsonMarshaler, protojsonUnmarshaler, sessionRegistry, nil, nil, nil, nil, tracker, router, runtime) + pipeline := NewPipeline(logger, cfg, db, protojsonMarshaler, protojsonUnmarshaler, sessionRegistry, nil, nil, nil, nil, tracker, router, runtime, metrics) apiServer := StartApiServer(logger, logger, db, protojsonMarshaler, protojsonUnmarshaler, cfg, "3.0.0", nil, storageIdx, nil, nil, sessionRegistry, sessionCache, nil, nil, nil, nil, tracker, router, nil, metrics, pipeline, runtime) WaitForSocket(nil, cfg) diff --git a/server/match_common_test.go b/server/match_common_test.go index cf1f189c2..02fd968b6 100644 --- a/server/match_common_test.go +++ b/server/match_common_test.go @@ -28,7 +28,6 @@ import ( "go.uber.org/atomic" "go.uber.org/zap" "go.uber.org/zap/zapcore" - "google.golang.org/grpc/codes" ) // loggerForTest allows for easily adjusting log output produced by tests in one place @@ -54,7 +53,7 @@ func createTestMatchRegistry(t fatalable, logger *zap.Logger) (*LocalMatchRegist cfg.GetMatch().LabelUpdateIntervalMs = int(time.Hour / time.Millisecond) messageRouter := &testMessageRouter{} matchRegistry := NewLocalMatchRegistry(logger, logger, cfg, &testSessionRegistry{}, &testTracker{}, - messageRouter, &testMetrics{}, "node") + messageRouter, metrics, "node") mp := NewMatchProvider() mp.RegisterCreateFn("go", @@ -151,42 +150,6 @@ func (m *testMatch) MatchSignal(ctx context.Context, logger runtime.Logger, db * return state, "signal received: " + data } -// testMetrics implements the Metrics interface and does nothing -type testMetrics struct{} - -func (m *testMetrics) Stop(logger *zap.Logger) {} -func (m *testMetrics) SnapshotLatencyMs() float64 { return 0 } -func (m *testMetrics) SnapshotRateSec() float64 { return 0 } -func (m *testMetrics) SnapshotRecvKbSec() float64 { return 0 } -func (m *testMetrics) SnapshotSentKbSec() float64 { return 0 } -func (m *testMetrics) Api(name string, elapsed time.Duration, recvBytes, sentBytes int64, rpcCode codes.Code) { -} - -func (m *testMetrics) ApiRpc(id string, elapsed time.Duration, recvBytes, sentBytes int64, rpcCode codes.Code) { -} -func (m *testMetrics) ApiBefore(name string, elapsed time.Duration, isErr bool) {} -func (m *testMetrics) ApiAfter(name string, elapsed time.Duration, isErr bool) {} -func (m *testMetrics) Message(recvBytes int64, isErr bool) {} -func (m *testMetrics) MessageBytesSent(sentBytes int64) {} -func (m *testMetrics) GaugeRuntimes(value float64) {} -func (m *testMetrics) GaugeLuaRuntimes(value float64) {} -func (m *testMetrics) GaugeJsRuntimes(value float64) {} -func (m *testMetrics) GaugeAuthoritativeMatches(value float64) {} -func (m *testMetrics) GaugeParties(value float64) {} -func (m *testMetrics) GaugeStorageIndexEntries(indexName string, value float64) {} -func (m *testMetrics) CountDroppedEvents(delta int64) {} -func (m *testMetrics) CountWebsocketOpened(delta int64) {} -func (m *testMetrics) CountWebsocketClosed(delta int64) {} -func (m *testMetrics) CountUntaggedGrpcStatsCalls(delta int64) {} -func (m *testMetrics) GaugeSessions(value float64) {} -func (m *testMetrics) GaugePresences(value float64) {} -func (m *testMetrics) Matchmaker(tickets, activeTickets float64, processTime time.Duration) {} -func (m *testMetrics) PresenceEvent(dequeueElapsed, processElapsed time.Duration) {} -func (m *testMetrics) StorageWriteRejectCount(tags map[string]string, delta int64) {} -func (m *testMetrics) CustomCounter(name string, tags map[string]string, delta int64) {} -func (m *testMetrics) CustomGauge(name string, tags map[string]string, value float64) {} -func (m *testMetrics) CustomTimer(name string, tags map[string]string, value time.Duration) {} - // testMessageRouter is used for testing, and can fire a callback // when the SendToPresenceIDs method is invoked type testMessageRouter struct { diff --git a/server/matchmaker_test.go b/server/matchmaker_test.go index 2aad83e45..bfd53d09c 100644 --- a/server/matchmaker_test.go +++ b/server/matchmaker_test.go @@ -1639,7 +1639,6 @@ func createTestMatchmaker(t fatalable, logger *zap.Logger, tickerActive bool, me } sessionRegistry := &testSessionRegistry{} tracker := &testTracker{} - metrics := &testMetrics{} jsonpbMarshaler := &protojson.MarshalOptions{ UseEnumNumbers: true, diff --git a/server/metrics.go b/server/metrics.go index 11fb5d298..b5e9cc751 100644 --- a/server/metrics.go +++ b/server/metrics.go @@ -44,6 +44,8 @@ type Metrics interface { ApiBefore(name string, elapsed time.Duration, isErr bool) ApiAfter(name string, elapsed time.Duration, isErr bool) + WsRpc(id string, elapsed time.Duration, recvBytes, sentBytes int64, rpcCode codes.Code) + Message(recvBytes int64, isErr bool) MessageBytesSent(sentBytes int64) @@ -269,7 +271,7 @@ func (m *LocalMetrics) Api(id string, elapsed time.Duration, recvBytes, sentByte } // New metrics format - labels := map[string]string{"rpc_id": id, "rpc_code": rpcCode.String()} + labels := map[string]string{"transport": "api", "rpc_id": id, "rpc_code": rpcCode.String()} labeledScope := m.PrometheusScope.Tagged(labels) labeledScope.Counter("count").Inc(1) labeledScope.Counter("recv_bytes").Inc(recvBytes) @@ -308,7 +310,46 @@ func (m *LocalMetrics) ApiRpc(id string, elapsed time.Duration, recvBytes, sentB } // New metrics format - labels := map[string]string{"transport": "request", "rpc_id": id, "rpc_code": rpcCode.String()} + labels := map[string]string{"transport": "api_rpc", "rpc_id": id, "rpc_code": rpcCode.String()} + labeledScope := m.PrometheusScope.Tagged(labels) + labeledScope.Counter("count").Inc(1) + labeledScope.Counter("recv_bytes").Inc(recvBytes) + labeledScope.Counter("sent_bytes").Inc(sentBytes) + labeledScope.Timer("latency").Record(elapsed) +} + +func (m *LocalMetrics) WsRpc(id string, elapsed time.Duration, recvBytes, sentBytes int64, rpcCode codes.Code) { + // Increment ongoing statistics for current measurement window. + m.currentMsTotal.Add(int64(elapsed / time.Millisecond)) + m.currentReqCount.Inc() + m.currentRecvBytes.Add(recvBytes) + m.currentSentBytes.Add(sentBytes) + + // Global stats. + m.PrometheusScope.Counter("overall_count").Inc(1) + m.PrometheusScope.Counter("overall_request_count").Inc(1) + m.PrometheusScope.Counter("overall_recv_bytes").Inc(recvBytes) + m.PrometheusScope.Counter("overall_request_recv_bytes").Inc(recvBytes) + m.PrometheusScope.Counter("overall_sent_bytes").Inc(sentBytes) + m.PrometheusScope.Counter("overall_request_sent_bytes").Inc(sentBytes) + m.PrometheusScope.Timer("overall_latency_ms").Record(elapsed) + + // Per-endpoint stats. + taggedScope := m.PrometheusScope.Tagged(map[string]string{"rpc_id": id}) + taggedScope.Counter("Rpc_count").Inc(1) + taggedScope.Counter("Rpc_recv_bytes").Inc(recvBytes) + taggedScope.Counter("Rpc_sent_bytes").Inc(sentBytes) + taggedScope.Timer("Rpc_latency_ms").Record(elapsed) + + // Error stats if applicable. + if rpcCode != codes.OK { + m.PrometheusScope.Counter("overall_errors").Inc(1) + m.PrometheusScope.Counter("overall_request_errors").Inc(1) + taggedScope.Counter("Rpc_errors").Inc(1) + } + + // New metrics format + labels := map[string]string{"transport": "ws_rpc", "rpc_id": id, "rpc_code": rpcCode.String()} labeledScope := m.PrometheusScope.Tagged(labels) labeledScope.Counter("count").Inc(1) labeledScope.Counter("recv_bytes").Inc(recvBytes) diff --git a/server/party_handler_test.go b/server/party_handler_test.go index 6ff6bf854..10fff9554 100644 --- a/server/party_handler_test.go +++ b/server/party_handler_test.go @@ -71,7 +71,7 @@ func createTestPartyHandler(t *testing.T, logger *zap.Logger, presence *rtapi.Us dmr := DummyMessageRouter{} - pr := NewLocalPartyRegistry(context.Background(), logger, logger, cfg, node, &testMetrics{}) + pr := NewLocalPartyRegistry(context.Background(), logger, logger, cfg, node, metrics) pr.Init(mm, &tt, &tsm, &dmr) ph := NewPartyHandler(logger, pr, mm, &tt, &tsm, &dmr, uuid.UUID{}, node, true, 10, presence) return ph, cleanup diff --git a/server/pipeline.go b/server/pipeline.go index d1aed71a3..d84ed5530 100644 --- a/server/pipeline.go +++ b/server/pipeline.go @@ -41,9 +41,10 @@ type Pipeline struct { router MessageRouter runtime *Runtime node string + metrics Metrics } -func NewPipeline(logger *zap.Logger, config Config, db *sql.DB, protojsonMarshaler *protojson.MarshalOptions, protojsonUnmarshaler *protojson.UnmarshalOptions, sessionRegistry SessionRegistry, statusRegistry StatusRegistry, matchRegistry MatchRegistry, partyRegistry PartyRegistry, matchmaker Matchmaker, tracker Tracker, router MessageRouter, runtime *Runtime) *Pipeline { +func NewPipeline(logger *zap.Logger, config Config, db *sql.DB, protojsonMarshaler *protojson.MarshalOptions, protojsonUnmarshaler *protojson.UnmarshalOptions, sessionRegistry SessionRegistry, statusRegistry StatusRegistry, matchRegistry MatchRegistry, partyRegistry PartyRegistry, matchmaker Matchmaker, tracker Tracker, router MessageRouter, runtime *Runtime, metrics *LocalMetrics) *Pipeline { return &Pipeline{ logger: logger, config: config, @@ -59,6 +60,7 @@ func NewPipeline(logger *zap.Logger, config Config, db *sql.DB, protojsonMarshal router: router, runtime: runtime, node: config.GetName(), + metrics: metrics, } } @@ -155,7 +157,7 @@ func (p *Pipeline) ProcessRequest(logger *zap.Logger, session Session, in *rtapi switch in.Message.(type) { case *rtapi.Envelope_Rpc: - // No before/after hooks on RPC. + // No before/after hooks on RPC. RPC handles its own metrics in pipeline_rpc.go. default: messageName = fmt.Sprintf("%T", in.Message) messageNameID = strings.ToLower(messageName) diff --git a/server/pipeline_rpc.go b/server/pipeline_rpc.go index 1d50fe1aa..55cafb64d 100644 --- a/server/pipeline_rpc.go +++ b/server/pipeline_rpc.go @@ -17,10 +17,12 @@ package server import ( "context" "strings" + "time" "github.com/heroiclabs/nakama-common/api" "github.com/heroiclabs/nakama-common/rtapi" "go.uber.org/zap" + "google.golang.org/grpc/codes" ) func (p *Pipeline) rpc(ctx context.Context, logger *zap.Logger, session Session, envelope *rtapi.Envelope) (bool, *rtapi.Envelope) { @@ -51,15 +53,27 @@ func (p *Pipeline) rpc(ctx context.Context, logger *zap.Logger, session Session, } } - result, fnErr, _ := fn(session.Context(), nil, nil, traceID, session.UserID().String(), session.Username(), session.Vars(), session.Expiry(), session.ID().String(), session.ClientIP(), session.ClientPort(), session.Lang(), rpcMessage.Payload) + start := time.Now().UTC() + rpcCode := codes.Unknown + recvBytes := int64(len([]byte(rpcMessage.Payload))) + var sentBytes int64 + defer func() { + p.metrics.WsRpc(id, time.Since(start), recvBytes, sentBytes, rpcCode) + }() + + result, fnErr, code := fn(session.Context(), nil, nil, traceID, session.UserID().String(), session.Username(), session.Vars(), session.Expiry(), session.ID().String(), session.ClientIP(), session.ClientPort(), session.Lang(), rpcMessage.Payload) if fnErr != nil { _ = session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{ Code: int32(rtapi.Error_RUNTIME_FUNCTION_EXCEPTION), Message: fnErr.Error(), }}}, true) + rpcCode = code return false, nil } + rpcCode = codes.OK + sentBytes = int64(len([]byte(result))) + out := &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Rpc{Rpc: &api.Rpc{ Id: rpcMessage.Id, Payload: result, diff --git a/server/runtime_test.go b/server/runtime_test.go index 2ce71ce9d..1260fd98f 100644 --- a/server/runtime_test.go +++ b/server/runtime_test.go @@ -383,7 +383,7 @@ nakama.register_rpc(test.printWorld, "helloworld")`, } db := NewDB(t) - pipeline := NewPipeline(logger, cfg, db, protojsonMarshaler, protojsonUnmarshaler, nil, nil, nil, nil, nil, nil, nil, runtime) + pipeline := NewPipeline(logger, cfg, db, protojsonMarshaler, protojsonUnmarshaler, nil, nil, nil, nil, nil, nil, nil, runtime, metrics) apiServer := StartApiServer(logger, logger, db, protojsonMarshaler, protojsonUnmarshaler, cfg, "", nil, storageIdx, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, metrics, pipeline, runtime) defer apiServer.Stop()