Skip to content

Commit 8a68afb

Browse files
authored
feat: add failure metrics to kinesis outbox event processing (#59)
* Refactor metric emitter to be a utility class, emit counter metric for failed kinesis events with error code metadata * bump version to 6.2.3 * share failed event creation logic between batch and individual event failures * align event names with delayed job terminology
1 parent f9d0f10 commit 8a68afb

File tree

9 files changed

+127
-106
lines changed

9 files changed

+127
-106
lines changed

Gemfile.lock

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
PATH
22
remote: .
33
specs:
4-
journaled (6.2.2)
4+
journaled (6.2.3)
55
activejob
66
activerecord
77
activesupport

gemfiles/rails_7_2.gemfile.lock

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
PATH
22
remote: ..
33
specs:
4-
journaled (6.2.2)
4+
journaled (6.2.3)
55
activejob
66
activerecord
77
activesupport

gemfiles/rails_8_0.gemfile.lock

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
PATH
22
remote: ..
33
specs:
4-
journaled (6.2.2)
4+
journaled (6.2.3)
55
activejob
66
activerecord
77
activesupport

lib/journaled/kinesis_batch_sender.rb

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,12 @@ def process_response(response, stream_events)
6868
event = stream_events[index]
6969

7070
if record_result.error_code
71-
failed << create_failed_event(event, record_result)
71+
failed << create_failed_event(
72+
event,
73+
error_code: record_result.error_code,
74+
error_message: record_result.error_message,
75+
transient: PERMANENT_ERROR_CODES.exclude?(record_result.error_code),
76+
)
7277
else
7378
succeeded << event
7479
end
@@ -77,21 +82,23 @@ def process_response(response, stream_events)
7782
{ succeeded:, failed: }
7883
end
7984

80-
def create_failed_event(event, record_result)
85+
def create_failed_event(event, error_code:, error_message:, transient:)
86+
Outbox::MetricEmitter.emit_kinesis_failure(event:, error_code:)
87+
8188
Journaled::KinesisFailedEvent.new(
8289
event:,
83-
error_code: record_result.error_code,
84-
error_message: record_result.error_message,
85-
transient: PERMANENT_ERROR_CODES.exclude?(record_result.error_code),
90+
error_code:,
91+
error_message:,
92+
transient:,
8693
)
8794
end
8895

8996
def handle_transient_batch_error(error, stream_events)
9097
Rails.logger.error("Kinesis batch send failed (transient): #{error.class} - #{error.message}")
9198

9299
failed = stream_events.map do |event|
93-
Journaled::KinesisFailedEvent.new(
94-
event:,
100+
create_failed_event(
101+
event,
95102
error_code: error.class.to_s,
96103
error_message: error.message,
97104
transient: true,

lib/journaled/kinesis_sequential_sender.rb

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,17 +58,23 @@ def send_event(event)
5858
event
5959
rescue *PERMANENT_ERROR_CLASSES => e
6060
Rails.logger.error("[Journaled] Kinesis event send failed (permanent): #{e.class} - #{e.message}")
61+
error_code = e.class.to_s
62+
Outbox::MetricEmitter.emit_kinesis_failure(event:, error_code:)
63+
6164
Journaled::KinesisFailedEvent.new(
6265
event:,
63-
error_code: e.class.to_s,
66+
error_code:,
6467
error_message: e.message,
6568
transient: false,
6669
)
6770
rescue StandardError => e
6871
Rails.logger.error("[Journaled] Kinesis event send failed (transient): #{e.class} - #{e.message}")
72+
error_code = e.class.to_s
73+
Outbox::MetricEmitter.emit_kinesis_failure(event:, error_code:)
74+
6975
Journaled::KinesisFailedEvent.new(
7076
event:,
71-
error_code: e.class.to_s,
77+
error_code:,
7278
error_message: e.message,
7379
transient: true,
7480
)

lib/journaled/outbox/metric_emitter.rb

Lines changed: 74 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -2,83 +2,92 @@
22

33
module Journaled
44
module Outbox
5-
# Handles metric emission for the Worker
5+
# Handles metric emission for the Worker and Kinesis senders
66
#
7-
# This class is responsible for collecting and emitting metrics about the outbox queue.
7+
# This class provides utility methods for collecting and emitting metrics.
88
class MetricEmitter
9-
def initialize(worker_id:)
10-
@worker_id = worker_id
11-
end
9+
class << self
10+
# Emit batch processing metrics
11+
#
12+
# @param stats [Hash] Processing statistics with :succeeded, :failed_permanently, :failed_transiently
13+
# @param worker_id [String] ID of the worker processing the batch
14+
def emit_batch_metrics(stats, worker_id:)
15+
total_events = stats[:succeeded] + stats[:failed_permanently] + stats[:failed_transiently]
1216

13-
# Emit batch processing metrics
14-
#
15-
# @param stats [Hash] Processing statistics with :succeeded, :failed_permanently, :failed_transiently
16-
def emit_batch_metrics(stats)
17-
total_events = stats[:succeeded] + stats[:failed_permanently] + stats[:failed_transiently]
17+
emit_metric('journaled.outbox_event.processed', value: total_events, worker_id:)
18+
emit_metric('journaled.outbox_event.sent', value: stats[:succeeded], worker_id:)
19+
emit_metric('journaled.outbox_event.failed', value: stats[:failed_permanently], worker_id:)
20+
emit_metric('journaled.outbox_event.errored', value: stats[:failed_transiently], worker_id:)
21+
end
1822

19-
emit_metric('journaled.worker.batch_process', value: total_events)
20-
emit_metric('journaled.worker.batch_sent', value: stats[:succeeded])
21-
emit_metric('journaled.worker.batch_failed_permanently', value: stats[:failed_permanently])
22-
emit_metric('journaled.worker.batch_failed_transiently', value: stats[:failed_transiently])
23-
end
23+
# Collect and emit queue metrics
24+
#
25+
# This calculates various queue statistics and emits individual metrics for each.
26+
# @param worker_id [String] ID of the worker collecting metrics
27+
def emit_queue_metrics(worker_id:)
28+
metrics = calculate_queue_metrics
2429

25-
# Collect and emit queue metrics
26-
#
27-
# This calculates various queue statistics and emits individual metrics for each.
28-
def emit_queue_metrics
29-
metrics = calculate_queue_metrics
30+
emit_metric('journaled.worker.queue_total_count', value: metrics[:total_count], worker_id:)
31+
emit_metric('journaled.worker.queue_workable_count', value: metrics[:workable_count], worker_id:)
32+
emit_metric('journaled.worker.queue_failed_count', value: metrics[:failed_count], worker_id:)
33+
emit_metric('journaled.worker.queue_oldest_age_seconds', value: metrics[:oldest_age_seconds], worker_id:)
3034

31-
emit_metric('journaled.worker.queue_total_count', value: metrics[:total_count])
32-
emit_metric('journaled.worker.queue_workable_count', value: metrics[:workable_count])
33-
emit_metric('journaled.worker.queue_erroring_count', value: metrics[:erroring_count])
34-
emit_metric('journaled.worker.queue_oldest_age_seconds', value: metrics[:oldest_age_seconds])
35+
Rails.logger.info(
36+
"Queue metrics: total=#{metrics[:total_count]}, " \
37+
"workable=#{metrics[:workable_count]}, " \
38+
"failed=#{metrics[:failed_count]}, " \
39+
"oldest_age=#{metrics[:oldest_age_seconds].round(2)}s",
40+
)
41+
end
3542

36-
Rails.logger.info(
37-
"Queue metrics: total=#{metrics[:total_count]}, " \
38-
"workable=#{metrics[:workable_count]}, " \
39-
"erroring=#{metrics[:erroring_count]}, " \
40-
"oldest_age=#{metrics[:oldest_age_seconds].round(2)}s",
41-
)
42-
end
43+
# Emit a metric notification for a Kinesis send failure
44+
#
45+
# @param event [Journaled::Outbox::Event] The failed event
46+
# @param error_code [String] The error code (e.g., 'ProvisionedThroughputExceededException')
47+
def emit_kinesis_failure(event:, error_code:)
48+
emit_metric(
49+
'journaled.kinesis.send_failure',
50+
partition_key: event.partition_key,
51+
error_code:,
52+
stream_name: event.stream_name,
53+
event_type: event.event_type,
54+
)
55+
end
4356

44-
private
57+
private
4558

46-
attr_reader :worker_id
47-
48-
# Emit a single metric notification
49-
#
50-
# @param event_name [String] The name of the metric event
51-
# @param payload [Hash] Additional payload data (event_count, value, etc.)
52-
def emit_metric(event_name, payload)
53-
ActiveSupport::Notifications.instrument(
54-
event_name,
55-
payload.merge(worker_id:),
56-
)
57-
end
59+
# Emit a single metric notification
60+
#
61+
# @param event_name [String] The name of the metric event
62+
# @param payload [Hash] Additional payload data (event_count, value, etc.)
63+
def emit_metric(event_name, payload)
64+
ActiveSupport::Notifications.instrument(event_name, payload)
65+
end
5866

59-
# Calculate queue metrics
60-
#
61-
# @return [Hash] Metrics including counts and oldest event timestamp
62-
def calculate_queue_metrics
63-
# Use a single query with COUNT(*) FILTER to calculate all counts in one table scan
64-
result = Event.connection.select_one(
65-
Event.select(
66-
'COUNT(*) AS total_count',
67-
'COUNT(*) FILTER (WHERE failed_at IS NULL) AS workable_count',
68-
'COUNT(*) FILTER (WHERE failure_reason IS NOT NULL AND failed_at IS NULL) AS erroring_count',
69-
'MIN(created_at) FILTER (WHERE failed_at IS NULL) AS oldest_non_failed_timestamp',
70-
).to_sql,
71-
)
67+
# Calculate queue metrics
68+
#
69+
# @return [Hash] Metrics including counts and oldest event timestamp
70+
def calculate_queue_metrics
71+
# Use a single query with COUNT(*) FILTER to calculate all counts in one table scan
72+
result = Event.connection.select_one(
73+
Event.select(
74+
'COUNT(*) AS total_count',
75+
'COUNT(*) FILTER (WHERE failed_at IS NULL) AS workable_count',
76+
'COUNT(*) FILTER (WHERE failure_reason IS NOT NULL AND failed_at IS NULL) AS failed_count',
77+
'MIN(created_at) FILTER (WHERE failed_at IS NULL) AS oldest_non_failed_timestamp',
78+
).to_sql,
79+
)
7280

73-
oldest_timestamp = result['oldest_non_failed_timestamp']
74-
oldest_age_seconds = oldest_timestamp ? Time.current - oldest_timestamp : 0
81+
oldest_timestamp = result['oldest_non_failed_timestamp']
82+
oldest_age_seconds = oldest_timestamp ? Time.current - oldest_timestamp : 0
7583

76-
{
77-
total_count: result['total_count'],
78-
workable_count: result['workable_count'],
79-
erroring_count: result['erroring_count'],
80-
oldest_age_seconds:,
81-
}
84+
{
85+
total_count: result['total_count'],
86+
workable_count: result['workable_count'],
87+
failed_count: result['failed_count'],
88+
oldest_age_seconds:,
89+
}
90+
end
8291
end
8392
end
8493
end

lib/journaled/outbox/worker.rb

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ def initialize
1818
@worker_id = "#{Socket.gethostname}-#{Process.pid}"
1919
self.running = false
2020
@processor = BatchProcessor.new
21-
@metric_emitter = MetricEmitter.new(worker_id: @worker_id)
2221
self.shutdown_requested = false
2322
@last_metrics_emission = Time.current
2423
end
@@ -50,7 +49,7 @@ def running?
5049

5150
private
5251

53-
attr_reader :worker_id, :processor, :metric_emitter
52+
attr_reader :worker_id, :processor
5453
attr_accessor :shutdown_requested, :running, :last_metrics_emission
5554

5655
def run_loop
@@ -77,7 +76,7 @@ def run_loop
7776
def process_batch
7877
stats = processor.process_batch
7978

80-
metric_emitter.emit_batch_metrics(stats)
79+
MetricEmitter.emit_batch_metrics(stats, worker_id:)
8180
end
8281

8382
def check_prerequisites!
@@ -120,7 +119,7 @@ def emit_metrics_if_needed
120119

121120
# Collect and emit queue metrics
122121
def collect_and_emit_metrics
123-
metric_emitter.emit_queue_metrics
122+
MetricEmitter.emit_queue_metrics(worker_id:)
124123
end
125124
end
126125
end

lib/journaled/version.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# frozen_string_literal: true
22

33
module Journaled
4-
VERSION = "6.2.2"
4+
VERSION = "6.2.3"
55
end

0 commit comments

Comments
 (0)