Skip to content

Commit 41f9daf

Browse files
author
Ian Pye
committed
Added basic metrics for streaming system
1 parent 96f864d commit 41f9daf

File tree

2 files changed

+74
-5
lines changed

2 files changed

+74
-5
lines changed

util/slog/slog.go

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,20 @@
11
package slog
22

33
import (
4+
json "encoding/json"
45
"fmt"
6+
zmq "github.com/pebbe/zmq3"
57
"logger"
68
"os"
9+
"stash.cloudflare.com/go-stream/util"
710
"strings"
11+
"time"
812
)
913

1014
var (
1115
LogPrefix string
12-
glog *logger.Logger // the main logger object
16+
glog *logger.Logger // the main logger object
17+
Gm *util.StreamingMetrics // Main metrics object
1318
)
1419

1520
// fatal: outputs a fatal startup error to STDERR, logs it to the
@@ -30,7 +35,7 @@ func exit(code int, l *logger.Logger, format string, v ...interface{}) {
3035
os.Exit(code)
3136
}
3237

33-
func Init(logName *string, logLevel *string, logPrefix *string) {
38+
func Init(logName *string, logLevel *string, logPrefix *string, metrics *util.StreamingMetrics, metricsAddr *string) {
3439
// Change logger level
3540
if err := logger.SetLogName(*logName); err != nil {
3641
fatal(nil, "Cannot set log name for program")
@@ -45,6 +50,9 @@ func Init(logName *string, logLevel *string, logPrefix *string) {
4550
fatal(nil, "Cannot start logger")
4651
}
4752
}
53+
54+
Gm = metrics
55+
go statsSender(metricsAddr, logPrefix)
4856
}
4957

5058
func Logf(level logger.Level, format string, v ...interface{}) {
@@ -56,3 +64,45 @@ func Logf(level logger.Level, format string, v ...interface{}) {
5664
func Fatalf(format string, v ...interface{}) {
5765
fatal(glog, format, v)
5866
}
67+
68+
type statsPkg struct {
69+
Name string
70+
TotalMsgs int64
71+
TotalErrors int64
72+
UpTime int64
73+
}
74+
75+
func statsSender(metricsAddr *string, processName *string) {
76+
77+
rep, err := zmq.NewSocket(zmq.REP)
78+
if err != nil {
79+
Logf(logger.Levels.Error, "Stats Sender error: %v", err.Error())
80+
return
81+
}
82+
defer rep.Close()
83+
err = rep.Bind(*metricsAddr)
84+
if err != nil {
85+
Logf(logger.Levels.Error, "Stats Sender error: %v", err.Error())
86+
return
87+
}
88+
89+
Logf(logger.Levels.Info, "Stats sender, listening on %s", *metricsAddr)
90+
91+
// Loop, printing the stats on request
92+
for {
93+
_, err := rep.Recv(0)
94+
if err != nil {
95+
Logf(logger.Levels.Error, "%v", err.Error())
96+
} else {
97+
timestamp := time.Now().Unix() - Gm.StartTime
98+
dBag := statsPkg{*processName, Gm.Total.Count(), Gm.Error.Count(), timestamp}
99+
stats, err := json.Marshal(dBag)
100+
if err == nil {
101+
_, err = rep.SendBytes(stats, zmq.DONTWAIT)
102+
if err != nil {
103+
Logf(logger.Levels.Error, "%v", err.Error())
104+
}
105+
}
106+
}
107+
}
108+
}

util/util.go

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
package util
22

3-
import "log"
4-
5-
var _ = log.Printf
3+
import (
4+
metrics "github.com/rcrowley/go-metrics"
5+
"time"
6+
)
67

78
type MemoryBuffer struct {
89
buf [][]byte
@@ -165,3 +166,21 @@ func (buf *SequentialBufferChanImpl) Reset() [][]byte {
165166
buf.seq = len(buf.chanbuf) + 1
166167
return ret
167168
}
169+
170+
type StreamingMetrics struct {
171+
Reg metrics.Registry
172+
Total metrics.Counter // total count of packets.
173+
Current metrics.Counter // packets in the last period
174+
Error metrics.Counter // total count of packets that are dropped
175+
StartTime int64 // How long we've been running for
176+
}
177+
178+
func NewStreamingMetrics(mReg metrics.Registry) *StreamingMetrics {
179+
return &StreamingMetrics{
180+
Reg: mReg,
181+
Total: metrics.NewCounter(),
182+
Current: metrics.NewCounter(),
183+
Error: metrics.NewCounter(),
184+
StartTime: time.Now().Unix(),
185+
}
186+
}

0 commit comments

Comments
 (0)