Skip to content

Commit 92431b4

Browse files
committed
feat: submit histogram of metrics on a minute-by-minute basis
1 parent a3bb944 commit 92431b4

File tree

1 file changed

+161
-42
lines changed

1 file changed

+161
-42
lines changed

slowlog_check.rb

Lines changed: 161 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@
77
require 'dogapi'
88

99
LOGGER = Logger.new($stdout)
10-
LOGGER.level = Logger::INFO
10+
LOGGER.level = Logger::DEBUG
11+
12+
METRICNAME = 'scribddev.redis.slowlog.micros'
1113

1214
REDIS = Redis.new(
1315
host: ENV.fetch('REDIS_HOST'),
@@ -26,7 +28,7 @@ def log_context
2628
LOGGER.debug(@event)
2729
end
2830

29-
def time
31+
def event_time
3032
# DateTime because Time does not natively parse AWS CloudWatch Event time
3133
DateTime.rfc3339(@event.fetch("time", DateTime.now.rfc3339))
3234
end
@@ -40,49 +42,48 @@ def replication_group
4042
end
4143
end
4244

45+
# TODO: Rather than hard code a day lookback,
46+
# look back at an increasing increment until hitting some max value
4347
def last_datadog_metrics_submitted_by_me_in_the_last_day
4448
resp = DDOG.get_points(
45-
"scribd.slowlog_check.slowlog{replication_group:#{replication_group}}",
49+
"#{METRICNAME}{replication_group:#{replication_group}}",
4650
Time.now - 86400,
4751
Time.now
4852
)
4953

50-
raise "Error getting last datadog metric submitted by me" unless resp[0] == "200"
54+
raise "Error getting last datadog metric submitted by me" unless resp[1].fetch("status") == "ok"
5155
resp
5256
end
5357

54-
def last_datadog_metric
58+
def minute_precision(time)
5559
Time.at(
56-
last_datadog_metrics_submitted_by_me_in_the_last_day[1]
57-
.fetch("series")
58-
.first
59-
.fetch("pointlist")
60-
.map {|x| x[0]}
61-
.max
62-
.to_i / 1000
60+
time.to_i - (time.to_i % 60)
6361
)
6462
end
6563

64+
def last_datadog_metric
65+
series = last_datadog_metrics_submitted_by_me_in_the_last_day[1].fetch("series")
66+
if series == [] # First invocation
67+
return Time.at(0)
68+
else
69+
minute_precision(
70+
Time.at(
71+
series
72+
.first
73+
.fetch("pointlist")
74+
.map {|x| x[0]}
75+
.max
76+
.to_i / 1000
77+
)
78+
)
79+
end
80+
end
81+
6682
def last_time_submitted
6783
return @last_time_submitted if defined? @last_time_submitted
6884
@last_time_submitted = last_datadog_metric
6985
end
7086

71-
def emit_point(time, value, tags)
72-
LOGGER.info "Sending slowlog entry: #{value}µs executing #{tags[:command]} at #{time}."
73-
resp = DDOG.emit_points(
74-
'redis.slowlog.micros.avg',
75-
[[time, value]],
76-
{
77-
host: replication_group,
78-
tags: tags
79-
}
80-
)
81-
raise "Error submitting metric for #{replication_group}" unless resp[0] == "202"
82-
@last_time_submitted = time
83-
resp
84-
end
85-
8687
def slowlog_time(slowlog)
8788
Time.at slowlog[1]
8889
end
@@ -91,15 +92,86 @@ def slowlog_microseconds(slowlog)
9192
slowlog[2]
9293
end
9394

94-
def client_ip(ip_and_port)
95-
ip_and_port.split(':')[0]
95+
def reporting_interval
96+
now_i = Time.now.to_i
97+
start_time_i = last_time_submitted.to_i + 60
98+
times = (start_time_i..now_i).step(60).to_a
99+
Hash[times.collect {|time| [Time.at(time), nil]}]
96100
end
97101

98-
def slowlog_tags(slowlog)
102+
def _95percentile(sorted_values)
103+
index = (sorted_values.length * 0.95) - 1
104+
sorted_values[index]
105+
end
106+
107+
def add_metric_to_bucket(prior, new)
108+
new_values = prior[:values].push(new)
109+
new_count = prior[:count] += 1
110+
new_avg = (prior[:avg] * prior[:count] + new) / new_count
111+
112+
sorted_values = new_values.sort
113+
new_median = sorted_values[sorted_values.count / 2]
114+
new_95percentile = _95percentile(sorted_values)
115+
new_min = sorted_values[0]
116+
new_max = sorted_values[-1]
117+
new_sum = sorted_values.reduce(:+)
118+
119+
{
120+
values: new_values,
121+
avg: new_avg,
122+
count: new_count,
123+
median: new_median,
124+
_95percentile: new_95percentile,
125+
min: new_min,
126+
max: new_max,
127+
sum: new_sum
128+
}
129+
end
130+
131+
def slowlogs_by_flush_interval
132+
result = reporting_interval
133+
REDIS.slowlog('get').each do |slowlog|
134+
time = slowlog_time(slowlog)
135+
break if minute_precision(time) <= minute_precision(last_time_submitted)
136+
137+
command = slowlog[3][0]
138+
value = slowlog_microseconds(slowlog)
139+
bucket = minute_precision(time)
140+
141+
if result[bucket].nil?
142+
result[bucket] = {
143+
command => {
144+
values: [value],
145+
avg: value,
146+
count: 1,
147+
median: value,
148+
_95percentile: value,
149+
min: value,
150+
max: value,
151+
sum: value
152+
}
153+
}
154+
elsif result[bucket][command].nil?
155+
result[bucket][command] = {
156+
values: [value],
157+
avg: value,
158+
count: 1,
159+
median: value,
160+
_95percentile: value,
161+
min: value,
162+
max: value,
163+
sum: value
164+
}
165+
else
166+
result[bucket][command] = add_metric_to_bucket(result[bucket][command], value)
167+
end
168+
end
169+
170+
result
171+
end
172+
173+
def default_tags
99174
{
100-
command: slowlog[3][0],
101-
client: client_ip(slowlog[4]),
102-
client_name: slowlog[5],
103175
replication_group: replication_group,
104176
service: replication_group,
105177
namespace: ENV.fetch('NAMESPACE'),
@@ -108,26 +180,69 @@ def slowlog_tags(slowlog)
108180
}
109181
end
110182

183+
def emit_point(params)
184+
metric = METRICNAME + '.' + params.fetch(:metric)
185+
type = params.fetch(:type, 'gauge')
186+
interval = params.fetch(:interval, 60)
187+
points = params.fetch(:points)
188+
host = params.fetch(:host, replication_group)
189+
tags = params.fetch(:tags, default_tags)
190+
191+
LOGGER.info "Sending slowlog entry: #{points.first[1]}µs executing #{tags[:command]} at #{points.first[0]}."
192+
resp = DDOG.emit_points(
193+
metric,
194+
points,
195+
{
196+
type: type,
197+
interval: interval,
198+
host: host,
199+
tags: tags
200+
}
201+
)
202+
raise "Error submitting metric for #{replication_group}" unless resp[1].fetch("status") == "ok"
203+
@last_time_submitted = Time.at(points.first[0])
204+
resp
205+
end
206+
111207
def ship_slowlogs
112-
REDIS.slowlog('get').each do |slowlog|
113-
break if slowlog_time(slowlog) <= last_time_submitted
114-
emit_point(
115-
slowlog_time(slowlog),
116-
slowlog_microseconds(slowlog),
117-
slowlog_tags(slowlog)
118-
)
208+
slowlogs = slowlogs_by_flush_interval
209+
slowlogs.keys.sort.each do |timestamp|
210+
timebucket = slowlogs.fetch(timestamp)
211+
next if timebucket.nil?
212+
213+
timebucket.keys.each do |command|
214+
all_metrics = timebucket.fetch(command)
215+
216+
# Emit most metrics
217+
[:avg, :count, :median, :min, :max].each do |metric|
218+
emit_point(
219+
metric: metric.to_s,
220+
type: metric == :count ? 'rate' : 'gauge',
221+
points: [[timestamp, all_metrics.fetch(metric)]],
222+
tags: default_tags.merge(command: command)
223+
)
224+
end
225+
226+
# Stupid symbol's cannot start with a number
227+
emit_point(
228+
metric: '95percentile',
229+
points: [[timestamp, all_metrics.fetch(:_95percentile)]],
230+
tags: default_tags.merge(command: command)
231+
)
232+
233+
end
119234
end
120235
end
121236

122237

238+
123239
def lambda_handler(event: {}, context: {})
124240
@event = event
125241

126242
log_context
127-
LOGGER.info "Event time: #{time}."
243+
LOGGER.info "Event time: #{event_time}."
128244
begin
129245
REDIS.ping
130-
ship_slowlogs
131246
rescue StandardError => e
132247
LOGGER.error e.inspect
133248
# => #<Redis::CannotConnectError: Timed out connecting to Redis on 10.0.1.1:6380>
@@ -140,4 +255,8 @@ def lambda_handler(event: {}, context: {})
140255

141256
if __FILE__ == $0
142257
lambda_handler
258+
259+
require 'pry'
260+
binding.pry
261+
143262
end

0 commit comments

Comments
 (0)