Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: .
specs:
journaled (6.2.2)
journaled (6.2.3)
activejob
activerecord
activesupport
Expand Down
2 changes: 1 addition & 1 deletion gemfiles/rails_7_2.gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
journaled (6.2.2)
journaled (6.2.3)
activejob
activerecord
activesupport
Expand Down
2 changes: 1 addition & 1 deletion gemfiles/rails_8_0.gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
journaled (6.2.2)
journaled (6.2.3)
activejob
activerecord
activesupport
Expand Down
21 changes: 14 additions & 7 deletions lib/journaled/kinesis_batch_sender.rb
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,12 @@ def process_response(response, stream_events)
event = stream_events[index]

if record_result.error_code
failed << create_failed_event(event, record_result)
failed << create_failed_event(
event,
error_code: record_result.error_code,
error_message: record_result.error_message,
transient: PERMANENT_ERROR_CODES.exclude?(record_result.error_code),
)
else
succeeded << event
end
Expand All @@ -77,21 +82,23 @@ def process_response(response, stream_events)
{ succeeded:, failed: }
end

def create_failed_event(event, record_result)
def create_failed_event(event, error_code:, error_message:, transient:)
Outbox::MetricEmitter.emit_kinesis_failure(event:, error_code:)

Journaled::KinesisFailedEvent.new(
event:,
error_code: record_result.error_code,
error_message: record_result.error_message,
transient: PERMANENT_ERROR_CODES.exclude?(record_result.error_code),
error_code:,
error_message:,
transient:,
)
end

def handle_transient_batch_error(error, stream_events)
Rails.logger.error("Kinesis batch send failed (transient): #{error.class} - #{error.message}")

failed = stream_events.map do |event|
Journaled::KinesisFailedEvent.new(
event:,
create_failed_event(
event,
error_code: error.class.to_s,
error_message: error.message,
transient: true,
Expand Down
10 changes: 8 additions & 2 deletions lib/journaled/kinesis_sequential_sender.rb
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,23 @@ def send_event(event)
event
rescue *PERMANENT_ERROR_CLASSES => e
Rails.logger.error("[Journaled] Kinesis event send failed (permanent): #{e.class} - #{e.message}")
error_code = e.class.to_s
Outbox::MetricEmitter.emit_kinesis_failure(event:, error_code:)

Journaled::KinesisFailedEvent.new(
event:,
error_code: e.class.to_s,
error_code:,
error_message: e.message,
transient: false,
)
rescue StandardError => e
Rails.logger.error("[Journaled] Kinesis event send failed (transient): #{e.class} - #{e.message}")
error_code = e.class.to_s
Outbox::MetricEmitter.emit_kinesis_failure(event:, error_code:)

Journaled::KinesisFailedEvent.new(
event:,
error_code: e.class.to_s,
error_code:,
error_message: e.message,
transient: true,
)
Expand Down
139 changes: 74 additions & 65 deletions lib/journaled/outbox/metric_emitter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,83 +2,92 @@

module Journaled
module Outbox
# Handles metric emission for the Worker
# Handles metric emission for the Worker and Kinesis senders
#
# This class is responsible for collecting and emitting metrics about the outbox queue.
# This class provides utility methods for collecting and emitting metrics.
class MetricEmitter
def initialize(worker_id:)
@worker_id = worker_id
end
class << self
# Emit batch processing metrics
#
# @param stats [Hash] Processing statistics with :succeeded, :failed_permanently, :failed_transiently
# @param worker_id [String] ID of the worker processing the batch
def emit_batch_metrics(stats, worker_id:)
total_events = stats[:succeeded] + stats[:failed_permanently] + stats[:failed_transiently]

# Emit batch processing metrics
#
# @param stats [Hash] Processing statistics with :succeeded, :failed_permanently, :failed_transiently
def emit_batch_metrics(stats)
total_events = stats[:succeeded] + stats[:failed_permanently] + stats[:failed_transiently]
emit_metric('journaled.worker.batch_process', value: total_events, worker_id:)
emit_metric('journaled.worker.batch_sent', value: stats[:succeeded], worker_id:)
emit_metric('journaled.worker.batch_failed_permanently', value: stats[:failed_permanently], worker_id:)
emit_metric('journaled.worker.batch_failed_transiently', value: stats[:failed_transiently], worker_id:)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
emit_metric('journaled.worker.batch_process', value: total_events, worker_id:)
emit_metric('journaled.worker.batch_sent', value: stats[:succeeded], worker_id:)
emit_metric('journaled.worker.batch_failed_permanently', value: stats[:failed_permanently], worker_id:)
emit_metric('journaled.worker.batch_failed_transiently', value: stats[:failed_transiently], worker_id:)
emit_metric('journaled.worker.batch.processed', value: total_events, worker_id:)
emit_metric('journaled.worker.batch.sent', value: stats[:succeeded], worker_id:)
emit_metric('journaled.worker.batch.failed_permanently', value: stats[:failed_permanently], worker_id:)
emit_metric('journaled.worker.batch.failed_transiently', value: stats[:failed_transiently], worker_id:)

🤷 nothing says we can't have more than 3 . in a stat name -- but mainly i renamed process to processed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though I'm wondering if we need batch_ at all. We're emitting stats in batches, but the stats themselves are counts of individual events:

Suggested change
emit_metric('journaled.worker.batch_process', value: total_events, worker_id:)
emit_metric('journaled.worker.batch_sent', value: stats[:succeeded], worker_id:)
emit_metric('journaled.worker.batch_failed_permanently', value: stats[:failed_permanently], worker_id:)
emit_metric('journaled.worker.batch_failed_transiently', value: stats[:failed_transiently], worker_id:)
emit_metric('journaled.event.processed', value: total_events, worker_id:)
emit_metric('journaled.event.sent', value: stats[:succeeded], worker_id:)
emit_metric('journaled.event.failed_permanently', value: stats[:failed_permanently], worker_id:)
emit_metric('journaled.event.failed_transiently', value: stats[:failed_transiently], worker_id:)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also thoughts on event.failed_permanently and event.failed_transiently becoming event.failed and event.errored? (Aligning with DJ terminology)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah all sounds good! will update

end

emit_metric('journaled.worker.batch_process', value: total_events)
emit_metric('journaled.worker.batch_sent', value: stats[:succeeded])
emit_metric('journaled.worker.batch_failed_permanently', value: stats[:failed_permanently])
emit_metric('journaled.worker.batch_failed_transiently', value: stats[:failed_transiently])
end
# Collect and emit queue metrics
#
# This calculates various queue statistics and emits individual metrics for each.
# @param worker_id [String] ID of the worker collecting metrics
def emit_queue_metrics(worker_id:)
metrics = calculate_queue_metrics

# Collect and emit queue metrics
#
# This calculates various queue statistics and emits individual metrics for each.
def emit_queue_metrics
metrics = calculate_queue_metrics
emit_metric('journaled.worker.queue_total_count', value: metrics[:total_count], worker_id:)
emit_metric('journaled.worker.queue_workable_count', value: metrics[:workable_count], worker_id:)
emit_metric('journaled.worker.queue_erroring_count', value: metrics[:erroring_count], worker_id:)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
emit_metric('journaled.worker.queue_erroring_count', value: metrics[:erroring_count], worker_id:)
emit_metric('journaled.worker.queue_failed_count', value: metrics[:erroring_count], worker_id:)

Should it be "failed count"? If it's a batch query it can really only count the number of hard-failed rows, not the number of transiently-erroring rows, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, updated!

emit_metric('journaled.worker.queue_oldest_age_seconds', value: metrics[:oldest_age_seconds], worker_id:)

emit_metric('journaled.worker.queue_total_count', value: metrics[:total_count])
emit_metric('journaled.worker.queue_workable_count', value: metrics[:workable_count])
emit_metric('journaled.worker.queue_erroring_count', value: metrics[:erroring_count])
emit_metric('journaled.worker.queue_oldest_age_seconds', value: metrics[:oldest_age_seconds])
Rails.logger.info(
"Queue metrics: total=#{metrics[:total_count]}, " \
"workable=#{metrics[:workable_count]}, " \
"erroring=#{metrics[:erroring_count]}, " \
"oldest_age=#{metrics[:oldest_age_seconds].round(2)}s",
)
end

Rails.logger.info(
"Queue metrics: total=#{metrics[:total_count]}, " \
"workable=#{metrics[:workable_count]}, " \
"erroring=#{metrics[:erroring_count]}, " \
"oldest_age=#{metrics[:oldest_age_seconds].round(2)}s",
)
end
# Emit a metric notification for a Kinesis send failure
#
# @param event [Journaled::Outbox::Event] The failed event
# @param error_code [String] The error code (e.g., 'ProvisionedThroughputExceededException')
def emit_kinesis_failure(event:, error_code:)
emit_metric(
'journaled.kinesis.send_failure',
partition_key: event.partition_key,
error_code:,
stream_name: event.stream_name,
event_type: event.event_type,
)
end

private
private

attr_reader :worker_id

# Emit a single metric notification
#
# @param event_name [String] The name of the metric event
# @param payload [Hash] Additional payload data (event_count, value, etc.)
def emit_metric(event_name, payload)
ActiveSupport::Notifications.instrument(
event_name,
payload.merge(worker_id:),
)
end
# Emit a single metric notification
#
# @param event_name [String] The name of the metric event
# @param payload [Hash] Additional payload data (event_count, value, etc.)
def emit_metric(event_name, payload)
ActiveSupport::Notifications.instrument(event_name, payload)
end

# Calculate queue metrics
#
# @return [Hash] Metrics including counts and oldest event timestamp
def calculate_queue_metrics
# Use a single query with COUNT(*) FILTER to calculate all counts in one table scan
result = Event.connection.select_one(
Event.select(
'COUNT(*) AS total_count',
'COUNT(*) FILTER (WHERE failed_at IS NULL) AS workable_count',
'COUNT(*) FILTER (WHERE failure_reason IS NOT NULL AND failed_at IS NULL) AS erroring_count',
'MIN(created_at) FILTER (WHERE failed_at IS NULL) AS oldest_non_failed_timestamp',
).to_sql,
)
# Calculate queue metrics
#
# @return [Hash] Metrics including counts and oldest event timestamp
def calculate_queue_metrics
# Use a single query with COUNT(*) FILTER to calculate all counts in one table scan
result = Event.connection.select_one(
Event.select(
'COUNT(*) AS total_count',
'COUNT(*) FILTER (WHERE failed_at IS NULL) AS workable_count',
'COUNT(*) FILTER (WHERE failure_reason IS NOT NULL AND failed_at IS NULL) AS erroring_count',
'MIN(created_at) FILTER (WHERE failed_at IS NULL) AS oldest_non_failed_timestamp',
).to_sql,
)

oldest_timestamp = result['oldest_non_failed_timestamp']
oldest_age_seconds = oldest_timestamp ? Time.current - oldest_timestamp : 0
oldest_timestamp = result['oldest_non_failed_timestamp']
oldest_age_seconds = oldest_timestamp ? Time.current - oldest_timestamp : 0

{
total_count: result['total_count'],
workable_count: result['workable_count'],
erroring_count: result['erroring_count'],
oldest_age_seconds:,
}
{
total_count: result['total_count'],
workable_count: result['workable_count'],
erroring_count: result['erroring_count'],
oldest_age_seconds:,
}
end
end
end
end
Expand Down
7 changes: 3 additions & 4 deletions lib/journaled/outbox/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ def initialize
@worker_id = "#{Socket.gethostname}-#{Process.pid}"
self.running = false
@processor = BatchProcessor.new
@metric_emitter = MetricEmitter.new(worker_id: @worker_id)
self.shutdown_requested = false
@last_metrics_emission = Time.current
end
Expand Down Expand Up @@ -50,7 +49,7 @@ def running?

private

attr_reader :worker_id, :processor, :metric_emitter
attr_reader :worker_id, :processor
attr_accessor :shutdown_requested, :running, :last_metrics_emission

def run_loop
Expand All @@ -77,7 +76,7 @@ def run_loop
def process_batch
stats = processor.process_batch

metric_emitter.emit_batch_metrics(stats)
MetricEmitter.emit_batch_metrics(stats, worker_id:)
end

def check_prerequisites!
Expand Down Expand Up @@ -120,7 +119,7 @@ def emit_metrics_if_needed

# Collect and emit queue metrics
def collect_and_emit_metrics
metric_emitter.emit_queue_metrics
MetricEmitter.emit_queue_metrics(worker_id:)
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/journaled/version.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# frozen_string_literal: true

module Journaled
VERSION = "6.2.2"
VERSION = "6.2.3"
end
Loading