Skip to content

Commit 148bf05

Browse files
author
Ian Pye
committed
Adding more extensive metrics
1 parent 41f9daf commit 148bf05

File tree

8 files changed

+61
-12
lines changed

8 files changed

+61
-12
lines changed

cluster/manager.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import "time"
55
/* There should be a separate manager for each service */
66
type Manager interface {
77
GetEra(t time.Time) Era
8+
GetCurrentEra() Era
89
}
910

1011
type StaticManager struct {
@@ -15,6 +16,10 @@ func (c *StaticManager) GetEra(t time.Time) Era {
1516
return c.E
1617
}
1718

19+
func (c *StaticManager) GetCurrentEra() Era {
20+
return c.E
21+
}
22+
1823
func NewStaticManager(e Era) *StaticManager {
1924
return &StaticManager{e}
2025
}

stream/chain.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,11 @@ func (c *inChain) In() chan Object {
170170
return ops[0].(In).In()
171171
}
172172

173+
func (c *inChain) GetInDepth() int {
174+
ops := c.Operators()
175+
return ops[0].(In).GetInDepth()
176+
}
177+
173178
func (c *inChain) SetIn(ch chan Object) {
174179
ops := c.Operators()
175180
ops[0].(In).SetIn(ch)

stream/operator.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ type Out interface {
4242
type In interface {
4343
In() chan Object
4444
SetIn(c chan Object)
45+
GetInDepth() int
4546
}
4647

4748
type InOutOperator interface {

stream/util.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ func (o *BaseIn) In() chan Object {
2121
return o.in
2222
}
2323

24+
func (o *BaseIn) GetInDepth() int {
25+
return len(o.in)
26+
}
27+
2428
func (o *BaseIn) SetIn(c chan Object) {
2529
o.in = c
2630
}

transport/client.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,15 @@ func (src *Client) Run() error {
7272
defer func() {
7373
src.running = false
7474
}()
75+
76+
slog.Gm.Register(stream.Name(src))
77+
go func(op string, s *Client) { // Update the queue depth on input for each phase
78+
for {
79+
slog.Gm.Update(&op, s.GetInDepth())
80+
time.Sleep(1 * time.Second)
81+
}
82+
}(stream.Name(src), src)
83+
7584
for src.retries < 3 {
7685
err := src.connect()
7786
if err == nil {
@@ -163,6 +172,7 @@ func (src *Client) connect() error {
163172
closing := false
164173

165174
//defer log.Println("Exiting client loop")
175+
opName := stream.Name(src)
166176
writesNotCompleted := uint(0)
167177
for {
168178
upstreamCh := src.In()
@@ -189,6 +199,7 @@ func (src *Client) connect() error {
189199
}
190200
sendData(sndChData, bytes, seq)
191201
writesNotCompleted += 1
202+
slog.Gm.Event(&opName) // These are bactched
192203
}
193204
case cnt := <-writeNotifier.NotificationChannel():
194205
writesNotCompleted -= cnt

transport/server.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ func (src Server) Run() error {
7373
hardCloseListener(src.StopNotifier, scl, ln)
7474
}()
7575

76+
slog.Gm.Register(stream.Name(src))
7677
for {
7778
conn, err := ln.Accept()
7879
if err != nil {
@@ -121,6 +122,7 @@ func (src Server) handleConnection(conn net.Conn) {
121122
wg_sub := &sync.WaitGroup{}
122123
defer wg_sub.Wait()
123124

125+
opName := stream.Name(src)
124126
sndChData := make(chan stream.Object, 100)
125127
sndChCloseNotifier := make(chan bool, 1)
126128
defer close(sndChData)
@@ -168,6 +170,7 @@ func (src Server) handleConnection(conn net.Conn) {
168170
return
169171
}
170172
command, seq, payload, err := parseMsg(obj.([]byte))
173+
slog.Gm.Event(&opName)
171174

172175
if err == nil {
173176
if command == DATA {

util/slog/slog.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,10 +66,9 @@ func Fatalf(format string, v ...interface{}) {
6666
}
6767

6868
type statsPkg struct {
69-
Name string
70-
TotalMsgs int64
71-
TotalErrors int64
72-
UpTime int64
69+
Name string
70+
UpTime int64
71+
OpMetrics map[string]interface{}
7372
}
7473

7574
func statsSender(metricsAddr *string, processName *string) {
@@ -95,7 +94,10 @@ func statsSender(metricsAddr *string, processName *string) {
9594
Logf(logger.Levels.Error, "%v", err.Error())
9695
} else {
9796
timestamp := time.Now().Unix() - Gm.StartTime
98-
dBag := statsPkg{*processName, Gm.Total.Count(), Gm.Error.Count(), timestamp}
97+
dBag := statsPkg{*processName, timestamp, map[string]interface{}{}}
98+
for k, v := range Gm.OpGroups {
99+
dBag.OpMetrics[k] = map[string]int64{"Events": v.Events.Count(), "Errors": v.Errors.Count(), "Queue": v.QueueLength.Value()}
100+
}
99101
stats, err := json.Marshal(dBag)
100102
if err == nil {
101103
_, err = rep.SendBytes(stats, zmq.DONTWAIT)

util/util.go

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -167,20 +167,38 @@ func (buf *SequentialBufferChanImpl) Reset() [][]byte {
167167
return ret
168168
}
169169

170+
type MetricsGroup struct {
171+
Events metrics.Counter
172+
Errors metrics.Counter
173+
QueueLength metrics.Gauge
174+
}
175+
170176
type StreamingMetrics struct {
171177
Reg metrics.Registry
172-
Total metrics.Counter // total count of packets.
173-
Current metrics.Counter // packets in the last period
174-
Error metrics.Counter // total count of packets that are dropped
175-
StartTime int64 // How long we've been running for
178+
OpGroups map[string]MetricsGroup // Each Op can have an associated metrics group
179+
StartTime int64 // How long we've been running for
180+
}
181+
182+
func (m *StreamingMetrics) Event(op *string) {
183+
m.OpGroups[*op].Events.Inc(1)
184+
}
185+
186+
func (m *StreamingMetrics) Error(op *string) {
187+
m.OpGroups[*op].Errors.Inc(1)
188+
}
189+
190+
func (m *StreamingMetrics) Update(op *string, v int) {
191+
m.OpGroups[*op].QueueLength.Update(int64(v))
192+
}
193+
194+
func (m *StreamingMetrics) Register(op string) {
195+
m.OpGroups[op] = MetricsGroup{metrics.NewCounter(), metrics.NewCounter(), metrics.NewGauge()}
176196
}
177197

178198
func NewStreamingMetrics(mReg metrics.Registry) *StreamingMetrics {
179199
return &StreamingMetrics{
180200
Reg: mReg,
181-
Total: metrics.NewCounter(),
182-
Current: metrics.NewCounter(),
183-
Error: metrics.NewCounter(),
201+
OpGroups: make(map[string]MetricsGroup),
184202
StartTime: time.Now().Unix(),
185203
}
186204
}

0 commit comments

Comments
 (0)