Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: .
specs:
journaled (6.2.2)
journaled (6.2.3)
activejob
activerecord
activesupport
Expand Down
2 changes: 1 addition & 1 deletion gemfiles/rails_7_2.gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
journaled (6.2.2)
journaled (6.2.3)
activejob
activerecord
activesupport
Expand Down
2 changes: 1 addition & 1 deletion gemfiles/rails_8_0.gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
journaled (6.2.2)
journaled (6.2.3)
activejob
activerecord
activesupport
Expand Down
21 changes: 14 additions & 7 deletions lib/journaled/kinesis_batch_sender.rb
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,12 @@ def process_response(response, stream_events)
event = stream_events[index]

if record_result.error_code
failed << create_failed_event(event, record_result)
failed << create_failed_event(
event,
error_code: record_result.error_code,
error_message: record_result.error_message,
transient: PERMANENT_ERROR_CODES.exclude?(record_result.error_code),
)
else
succeeded << event
end
Expand All @@ -77,21 +82,23 @@ def process_response(response, stream_events)
{ succeeded:, failed: }
end

def create_failed_event(event, record_result)
def create_failed_event(event, error_code:, error_message:, transient:)
Outbox::MetricEmitter.emit_kinesis_failure(event:, error_code:)

Journaled::KinesisFailedEvent.new(
event:,
error_code: record_result.error_code,
error_message: record_result.error_message,
transient: PERMANENT_ERROR_CODES.exclude?(record_result.error_code),
error_code:,
error_message:,
transient:,
)
end

def handle_transient_batch_error(error, stream_events)
Rails.logger.error("Kinesis batch send failed (transient): #{error.class} - #{error.message}")

failed = stream_events.map do |event|
Journaled::KinesisFailedEvent.new(
event:,
create_failed_event(
event,
error_code: error.class.to_s,
error_message: error.message,
transient: true,
Expand Down
10 changes: 8 additions & 2 deletions lib/journaled/kinesis_sequential_sender.rb
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,23 @@ def send_event(event)
event
rescue *PERMANENT_ERROR_CLASSES => e
Rails.logger.error("[Journaled] Kinesis event send failed (permanent): #{e.class} - #{e.message}")
error_code = e.class.to_s
Outbox::MetricEmitter.emit_kinesis_failure(event:, error_code:)

Journaled::KinesisFailedEvent.new(
event:,
error_code: e.class.to_s,
error_code:,
error_message: e.message,
transient: false,
)
rescue StandardError => e
Rails.logger.error("[Journaled] Kinesis event send failed (transient): #{e.class} - #{e.message}")
error_code = e.class.to_s
Outbox::MetricEmitter.emit_kinesis_failure(event:, error_code:)

Journaled::KinesisFailedEvent.new(
event:,
error_code: e.class.to_s,
error_code:,
error_message: e.message,
transient: true,
)
Expand Down
139 changes: 74 additions & 65 deletions lib/journaled/outbox/metric_emitter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,83 +2,92 @@

module Journaled
module Outbox
# Handles metric emission for the Worker
# Handles metric emission for the Worker and Kinesis senders
#
# This class is responsible for collecting and emitting metrics about the outbox queue.
# This class provides utility methods for collecting and emitting metrics.
class MetricEmitter
def initialize(worker_id:)
@worker_id = worker_id
end
class << self
# Emit batch processing metrics
#
# @param stats [Hash] Processing statistics with :succeeded, :failed_permanently, :failed_transiently
# @param worker_id [String] ID of the worker processing the batch
def emit_batch_metrics(stats, worker_id:)
total_events = stats[:succeeded] + stats[:failed_permanently] + stats[:failed_transiently]

# Emit batch processing metrics
#
# @param stats [Hash] Processing statistics with :succeeded, :failed_permanently, :failed_transiently
def emit_batch_metrics(stats)
total_events = stats[:succeeded] + stats[:failed_permanently] + stats[:failed_transiently]
emit_metric('journaled.outbox_event.processed', value: total_events, worker_id:)
emit_metric('journaled.outbox_event.sent', value: stats[:succeeded], worker_id:)
emit_metric('journaled.outbox_event.failed', value: stats[:failed_permanently], worker_id:)
emit_metric('journaled.outbox_event.errored', value: stats[:failed_transiently], worker_id:)
end

emit_metric('journaled.worker.batch_process', value: total_events)
emit_metric('journaled.worker.batch_sent', value: stats[:succeeded])
emit_metric('journaled.worker.batch_failed_permanently', value: stats[:failed_permanently])
emit_metric('journaled.worker.batch_failed_transiently', value: stats[:failed_transiently])
end
# Collect and emit queue metrics
#
# This calculates various queue statistics and emits individual metrics for each.
# @param worker_id [String] ID of the worker collecting metrics
def emit_queue_metrics(worker_id:)
metrics = calculate_queue_metrics

# Collect and emit queue metrics
#
# This calculates various queue statistics and emits individual metrics for each.
def emit_queue_metrics
metrics = calculate_queue_metrics
emit_metric('journaled.worker.queue_total_count', value: metrics[:total_count], worker_id:)
emit_metric('journaled.worker.queue_workable_count', value: metrics[:workable_count], worker_id:)
emit_metric('journaled.worker.queue_failed_count', value: metrics[:failed_count], worker_id:)
emit_metric('journaled.worker.queue_oldest_age_seconds', value: metrics[:oldest_age_seconds], worker_id:)

emit_metric('journaled.worker.queue_total_count', value: metrics[:total_count])
emit_metric('journaled.worker.queue_workable_count', value: metrics[:workable_count])
emit_metric('journaled.worker.queue_erroring_count', value: metrics[:erroring_count])
emit_metric('journaled.worker.queue_oldest_age_seconds', value: metrics[:oldest_age_seconds])
Rails.logger.info(
"Queue metrics: total=#{metrics[:total_count]}, " \
"workable=#{metrics[:workable_count]}, " \
"failed=#{metrics[:failed_count]}, " \
"oldest_age=#{metrics[:oldest_age_seconds].round(2)}s",
)
end

Rails.logger.info(
"Queue metrics: total=#{metrics[:total_count]}, " \
"workable=#{metrics[:workable_count]}, " \
"erroring=#{metrics[:erroring_count]}, " \
"oldest_age=#{metrics[:oldest_age_seconds].round(2)}s",
)
end
# Emit a metric notification for a Kinesis send failure
#
# @param event [Journaled::Outbox::Event] The failed event
# @param error_code [String] The error code (e.g., 'ProvisionedThroughputExceededException')
def emit_kinesis_failure(event:, error_code:)
emit_metric(
'journaled.kinesis.send_failure',
partition_key: event.partition_key,
error_code:,
stream_name: event.stream_name,
event_type: event.event_type,
)
end

private
private

attr_reader :worker_id

# Emit a single metric notification
#
# @param event_name [String] The name of the metric event
# @param payload [Hash] Additional payload data (event_count, value, etc.)
def emit_metric(event_name, payload)
ActiveSupport::Notifications.instrument(
event_name,
payload.merge(worker_id:),
)
end
# Emit a single metric notification
#
# @param event_name [String] The name of the metric event
# @param payload [Hash] Additional payload data (event_count, value, etc.)
def emit_metric(event_name, payload)
ActiveSupport::Notifications.instrument(event_name, payload)
end

# Calculate queue metrics
#
# @return [Hash] Metrics including counts and oldest event timestamp
def calculate_queue_metrics
# Use a single query with COUNT(*) FILTER to calculate all counts in one table scan
result = Event.connection.select_one(
Event.select(
'COUNT(*) AS total_count',
'COUNT(*) FILTER (WHERE failed_at IS NULL) AS workable_count',
'COUNT(*) FILTER (WHERE failure_reason IS NOT NULL AND failed_at IS NULL) AS erroring_count',
'MIN(created_at) FILTER (WHERE failed_at IS NULL) AS oldest_non_failed_timestamp',
).to_sql,
)
# Calculate queue metrics
#
# @return [Hash] Metrics including counts and oldest event timestamp
def calculate_queue_metrics
# Use a single query with COUNT(*) FILTER to calculate all counts in one table scan
result = Event.connection.select_one(
Event.select(
'COUNT(*) AS total_count',
'COUNT(*) FILTER (WHERE failed_at IS NULL) AS workable_count',
'COUNT(*) FILTER (WHERE failure_reason IS NOT NULL AND failed_at IS NULL) AS failed_count',
'MIN(created_at) FILTER (WHERE failed_at IS NULL) AS oldest_non_failed_timestamp',
).to_sql,
)

oldest_timestamp = result['oldest_non_failed_timestamp']
oldest_age_seconds = oldest_timestamp ? Time.current - oldest_timestamp : 0
oldest_timestamp = result['oldest_non_failed_timestamp']
oldest_age_seconds = oldest_timestamp ? Time.current - oldest_timestamp : 0

{
total_count: result['total_count'],
workable_count: result['workable_count'],
erroring_count: result['erroring_count'],
oldest_age_seconds:,
}
{
total_count: result['total_count'],
workable_count: result['workable_count'],
failed_count: result['failed_count'],
oldest_age_seconds:,
}
end
end
end
end
Expand Down
7 changes: 3 additions & 4 deletions lib/journaled/outbox/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ def initialize
@worker_id = "#{Socket.gethostname}-#{Process.pid}"
self.running = false
@processor = BatchProcessor.new
@metric_emitter = MetricEmitter.new(worker_id: @worker_id)
self.shutdown_requested = false
@last_metrics_emission = Time.current
end
Expand Down Expand Up @@ -50,7 +49,7 @@ def running?

private

attr_reader :worker_id, :processor, :metric_emitter
attr_reader :worker_id, :processor
attr_accessor :shutdown_requested, :running, :last_metrics_emission

def run_loop
Expand All @@ -77,7 +76,7 @@ def run_loop
def process_batch
stats = processor.process_batch

metric_emitter.emit_batch_metrics(stats)
MetricEmitter.emit_batch_metrics(stats, worker_id:)
end

def check_prerequisites!
Expand Down Expand Up @@ -120,7 +119,7 @@ def emit_metrics_if_needed

# Collect and emit queue metrics
def collect_and_emit_metrics
metric_emitter.emit_queue_metrics
MetricEmitter.emit_queue_metrics(worker_id:)
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/journaled/version.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# frozen_string_literal: true

module Journaled
VERSION = "6.2.2"
VERSION = "6.2.3"
end
Loading