Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func main() {

leaderboardScheduler.Start(runtime)

pipeline := server.NewPipeline(logger, config, db, jsonpbMarshaler, jsonpbUnmarshaler, sessionRegistry, statusRegistry, matchRegistry, partyRegistry, matchmaker, tracker, router, runtime)
pipeline := server.NewPipeline(logger, config, db, jsonpbMarshaler, jsonpbUnmarshaler, sessionRegistry, statusRegistry, matchRegistry, partyRegistry, matchmaker, tracker, router, runtime, metrics)
statusHandler := server.NewLocalStatusHandler(logger, sessionRegistry, matchRegistry, partyRegistry, tracker, metrics, config.GetName(), createTime)

telemetryEnabled := os.Getenv("NAKAMA_TELEMETRY") != "0"
Expand Down
2 changes: 1 addition & 1 deletion server/api_leaderboard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ nk.leaderboard_create(%q, %t, %q, %q, reset, metadata, %t)
tracker := &LocalTracker{}
sessionCache := NewLocalSessionCache(1_000, 3_600)

pipeline := NewPipeline(logger, cfg, db, protojsonMarshaler, protojsonUnmarshaler, nil, nil, nil, nil, nil, tracker, router, runtime)
pipeline := NewPipeline(logger, cfg, db, protojsonMarshaler, protojsonUnmarshaler, nil, nil, nil, nil, nil, tracker, router, runtime, metrics)

apiServer := StartApiServer(logger, logger, db, protojsonMarshaler,
protojsonUnmarshaler, cfg, "3.0.0", nil, nil, rtData.leaderboardCache,
Expand Down
2 changes: 1 addition & 1 deletion server/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func NewAPIServer(t *testing.T, runtime *Runtime) (*ApiServer, *Pipeline) {
sessionCache := NewLocalSessionCache(3_600, 7_200)
sessionRegistry := NewLocalSessionRegistry(metrics)
tracker := &LocalTracker{sessionRegistry: sessionRegistry}
pipeline := NewPipeline(logger, cfg, db, protojsonMarshaler, protojsonUnmarshaler, sessionRegistry, nil, nil, nil, nil, tracker, router, runtime)
pipeline := NewPipeline(logger, cfg, db, protojsonMarshaler, protojsonUnmarshaler, sessionRegistry, nil, nil, nil, nil, tracker, router, runtime, metrics)
apiServer := StartApiServer(logger, logger, db, protojsonMarshaler, protojsonUnmarshaler, cfg, "3.0.0", nil, storageIdx, nil, nil, sessionRegistry, sessionCache, nil, nil, nil, nil, tracker, router, nil, metrics, pipeline, runtime)

WaitForSocket(nil, cfg)
Expand Down
39 changes: 1 addition & 38 deletions server/match_common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"go.uber.org/atomic"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"google.golang.org/grpc/codes"
)

// loggerForTest allows for easily adjusting log output produced by tests in one place
Expand All @@ -54,7 +53,7 @@ func createTestMatchRegistry(t fatalable, logger *zap.Logger) (*LocalMatchRegist
cfg.GetMatch().LabelUpdateIntervalMs = int(time.Hour / time.Millisecond)
messageRouter := &testMessageRouter{}
matchRegistry := NewLocalMatchRegistry(logger, logger, cfg, &testSessionRegistry{}, &testTracker{},
messageRouter, &testMetrics{}, "node")
messageRouter, metrics, "node")
mp := NewMatchProvider()

mp.RegisterCreateFn("go",
Expand Down Expand Up @@ -151,42 +150,6 @@ func (m *testMatch) MatchSignal(ctx context.Context, logger runtime.Logger, db *
return state, "signal received: " + data
}

// testMetrics implements the Metrics interface and does nothing
type testMetrics struct{}

func (m *testMetrics) Stop(logger *zap.Logger) {}
func (m *testMetrics) SnapshotLatencyMs() float64 { return 0 }
func (m *testMetrics) SnapshotRateSec() float64 { return 0 }
func (m *testMetrics) SnapshotRecvKbSec() float64 { return 0 }
func (m *testMetrics) SnapshotSentKbSec() float64 { return 0 }
func (m *testMetrics) Api(name string, elapsed time.Duration, recvBytes, sentBytes int64, rpcCode codes.Code) {
}

func (m *testMetrics) ApiRpc(id string, elapsed time.Duration, recvBytes, sentBytes int64, rpcCode codes.Code) {
}
func (m *testMetrics) ApiBefore(name string, elapsed time.Duration, isErr bool) {}
func (m *testMetrics) ApiAfter(name string, elapsed time.Duration, isErr bool) {}
func (m *testMetrics) Message(recvBytes int64, isErr bool) {}
func (m *testMetrics) MessageBytesSent(sentBytes int64) {}
func (m *testMetrics) GaugeRuntimes(value float64) {}
func (m *testMetrics) GaugeLuaRuntimes(value float64) {}
func (m *testMetrics) GaugeJsRuntimes(value float64) {}
func (m *testMetrics) GaugeAuthoritativeMatches(value float64) {}
func (m *testMetrics) GaugeParties(value float64) {}
func (m *testMetrics) GaugeStorageIndexEntries(indexName string, value float64) {}
func (m *testMetrics) CountDroppedEvents(delta int64) {}
func (m *testMetrics) CountWebsocketOpened(delta int64) {}
func (m *testMetrics) CountWebsocketClosed(delta int64) {}
func (m *testMetrics) CountUntaggedGrpcStatsCalls(delta int64) {}
func (m *testMetrics) GaugeSessions(value float64) {}
func (m *testMetrics) GaugePresences(value float64) {}
func (m *testMetrics) Matchmaker(tickets, activeTickets float64, processTime time.Duration) {}
func (m *testMetrics) PresenceEvent(dequeueElapsed, processElapsed time.Duration) {}
func (m *testMetrics) StorageWriteRejectCount(tags map[string]string, delta int64) {}
func (m *testMetrics) CustomCounter(name string, tags map[string]string, delta int64) {}
func (m *testMetrics) CustomGauge(name string, tags map[string]string, value float64) {}
func (m *testMetrics) CustomTimer(name string, tags map[string]string, value time.Duration) {}

// testMessageRouter is used for testing, and can fire a callback
// when the SendToPresenceIDs method is invoked
type testMessageRouter struct {
Expand Down
1 change: 0 additions & 1 deletion server/matchmaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1639,7 +1639,6 @@ func createTestMatchmaker(t fatalable, logger *zap.Logger, tickerActive bool, me
}
sessionRegistry := &testSessionRegistry{}
tracker := &testTracker{}
metrics := &testMetrics{}

jsonpbMarshaler := &protojson.MarshalOptions{
UseEnumNumbers: true,
Expand Down
45 changes: 43 additions & 2 deletions server/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ type Metrics interface {
ApiBefore(name string, elapsed time.Duration, isErr bool)
ApiAfter(name string, elapsed time.Duration, isErr bool)

WsRpc(id string, elapsed time.Duration, recvBytes, sentBytes int64, rpcCode codes.Code)

Message(recvBytes int64, isErr bool)
MessageBytesSent(sentBytes int64)

Expand Down Expand Up @@ -269,7 +271,7 @@ func (m *LocalMetrics) Api(id string, elapsed time.Duration, recvBytes, sentByte
}

// New metrics format
labels := map[string]string{"rpc_id": id, "rpc_code": rpcCode.String()}
labels := map[string]string{"transport": "api", "rpc_id": id, "rpc_code": rpcCode.String()}
labeledScope := m.PrometheusScope.Tagged(labels)
labeledScope.Counter("count").Inc(1)
labeledScope.Counter("recv_bytes").Inc(recvBytes)
Expand Down Expand Up @@ -308,7 +310,46 @@ func (m *LocalMetrics) ApiRpc(id string, elapsed time.Duration, recvBytes, sentB
}

// New metrics format
labels := map[string]string{"transport": "request", "rpc_id": id, "rpc_code": rpcCode.String()}
labels := map[string]string{"transport": "api_rpc", "rpc_id": id, "rpc_code": rpcCode.String()}
labeledScope := m.PrometheusScope.Tagged(labels)
labeledScope.Counter("count").Inc(1)
labeledScope.Counter("recv_bytes").Inc(recvBytes)
labeledScope.Counter("sent_bytes").Inc(sentBytes)
labeledScope.Timer("latency").Record(elapsed)
}

func (m *LocalMetrics) WsRpc(id string, elapsed time.Duration, recvBytes, sentBytes int64, rpcCode codes.Code) {
// Increment ongoing statistics for current measurement window.
m.currentMsTotal.Add(int64(elapsed / time.Millisecond))
m.currentReqCount.Inc()
m.currentRecvBytes.Add(recvBytes)
m.currentSentBytes.Add(sentBytes)

// Global stats.
m.PrometheusScope.Counter("overall_count").Inc(1)
m.PrometheusScope.Counter("overall_request_count").Inc(1)
m.PrometheusScope.Counter("overall_recv_bytes").Inc(recvBytes)
m.PrometheusScope.Counter("overall_request_recv_bytes").Inc(recvBytes)
m.PrometheusScope.Counter("overall_sent_bytes").Inc(sentBytes)
m.PrometheusScope.Counter("overall_request_sent_bytes").Inc(sentBytes)
m.PrometheusScope.Timer("overall_latency_ms").Record(elapsed)

// Per-endpoint stats.
taggedScope := m.PrometheusScope.Tagged(map[string]string{"rpc_id": id})
taggedScope.Counter("Rpc_count").Inc(1)
taggedScope.Counter("Rpc_recv_bytes").Inc(recvBytes)
taggedScope.Counter("Rpc_sent_bytes").Inc(sentBytes)
taggedScope.Timer("Rpc_latency_ms").Record(elapsed)

// Error stats if applicable.
if rpcCode != codes.OK {
m.PrometheusScope.Counter("overall_errors").Inc(1)
m.PrometheusScope.Counter("overall_request_errors").Inc(1)
taggedScope.Counter("Rpc_errors").Inc(1)
}

// New metrics format
labels := map[string]string{"transport": "ws_rpc", "rpc_id": id, "rpc_code": rpcCode.String()}
labeledScope := m.PrometheusScope.Tagged(labels)
labeledScope.Counter("count").Inc(1)
labeledScope.Counter("recv_bytes").Inc(recvBytes)
Expand Down
2 changes: 1 addition & 1 deletion server/party_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func createTestPartyHandler(t *testing.T, logger *zap.Logger, presence *rtapi.Us

dmr := DummyMessageRouter{}

pr := NewLocalPartyRegistry(context.Background(), logger, logger, cfg, node, &testMetrics{})
pr := NewLocalPartyRegistry(context.Background(), logger, logger, cfg, node, metrics)
pr.Init(mm, &tt, &tsm, &dmr)
ph := NewPartyHandler(logger, pr, mm, &tt, &tsm, &dmr, uuid.UUID{}, node, true, 10, presence)
return ph, cleanup
Expand Down
6 changes: 4 additions & 2 deletions server/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,10 @@ type Pipeline struct {
router MessageRouter
runtime *Runtime
node string
metrics Metrics
}

func NewPipeline(logger *zap.Logger, config Config, db *sql.DB, protojsonMarshaler *protojson.MarshalOptions, protojsonUnmarshaler *protojson.UnmarshalOptions, sessionRegistry SessionRegistry, statusRegistry StatusRegistry, matchRegistry MatchRegistry, partyRegistry PartyRegistry, matchmaker Matchmaker, tracker Tracker, router MessageRouter, runtime *Runtime) *Pipeline {
func NewPipeline(logger *zap.Logger, config Config, db *sql.DB, protojsonMarshaler *protojson.MarshalOptions, protojsonUnmarshaler *protojson.UnmarshalOptions, sessionRegistry SessionRegistry, statusRegistry StatusRegistry, matchRegistry MatchRegistry, partyRegistry PartyRegistry, matchmaker Matchmaker, tracker Tracker, router MessageRouter, runtime *Runtime, metrics *LocalMetrics) *Pipeline {
return &Pipeline{
logger: logger,
config: config,
Expand All @@ -59,6 +60,7 @@ func NewPipeline(logger *zap.Logger, config Config, db *sql.DB, protojsonMarshal
router: router,
runtime: runtime,
node: config.GetName(),
metrics: metrics,
}
}

Expand Down Expand Up @@ -155,7 +157,7 @@ func (p *Pipeline) ProcessRequest(logger *zap.Logger, session Session, in *rtapi

switch in.Message.(type) {
case *rtapi.Envelope_Rpc:
// No before/after hooks on RPC.
// No before/after hooks on RPC. RPC handles its own metrics in pipeline_rpc.go.
default:
messageName = fmt.Sprintf("%T", in.Message)
messageNameID = strings.ToLower(messageName)
Expand Down
16 changes: 15 additions & 1 deletion server/pipeline_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ package server
import (
"context"
"strings"
"time"

"github.com/heroiclabs/nakama-common/api"
"github.com/heroiclabs/nakama-common/rtapi"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
)

func (p *Pipeline) rpc(ctx context.Context, logger *zap.Logger, session Session, envelope *rtapi.Envelope) (bool, *rtapi.Envelope) {
Expand Down Expand Up @@ -51,15 +53,27 @@ func (p *Pipeline) rpc(ctx context.Context, logger *zap.Logger, session Session,
}
}

result, fnErr, _ := fn(session.Context(), nil, nil, traceID, session.UserID().String(), session.Username(), session.Vars(), session.Expiry(), session.ID().String(), session.ClientIP(), session.ClientPort(), session.Lang(), rpcMessage.Payload)
start := time.Now().UTC()
rpcCode := codes.Unknown
recvBytes := int64(len([]byte(rpcMessage.Payload)))
var sentBytes int64
defer func() {
p.metrics.WsRpc(id, time.Since(start), recvBytes, sentBytes, rpcCode)
}()

result, fnErr, code := fn(session.Context(), nil, nil, traceID, session.UserID().String(), session.Username(), session.Vars(), session.Expiry(), session.ID().String(), session.ClientIP(), session.ClientPort(), session.Lang(), rpcMessage.Payload)
if fnErr != nil {
_ = session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
Code: int32(rtapi.Error_RUNTIME_FUNCTION_EXCEPTION),
Message: fnErr.Error(),
}}}, true)
rpcCode = code
return false, nil
}

rpcCode = codes.OK
sentBytes = int64(len([]byte(result)))

out := &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Rpc{Rpc: &api.Rpc{
Id: rpcMessage.Id,
Payload: result,
Expand Down
2 changes: 1 addition & 1 deletion server/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ nakama.register_rpc(test.printWorld, "helloworld")`,
}

db := NewDB(t)
pipeline := NewPipeline(logger, cfg, db, protojsonMarshaler, protojsonUnmarshaler, nil, nil, nil, nil, nil, nil, nil, runtime)
pipeline := NewPipeline(logger, cfg, db, protojsonMarshaler, protojsonUnmarshaler, nil, nil, nil, nil, nil, nil, nil, runtime, metrics)
apiServer := StartApiServer(logger, logger, db, protojsonMarshaler, protojsonUnmarshaler, cfg, "", nil, storageIdx, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, metrics, pipeline, runtime)
defer apiServer.Stop()

Expand Down
Loading