Skip to content

Commit 9d02236

Browse files
committed
Refactor Kinesis delivery and introduce pluggable adapter pattern
Extract Kinesis client creation and batch sending logic into reusable components. Introduce a DeliveryAdapter interface to support alternative event delivery mechanisms while maintaining backward compatibility with the existing ActiveJob-based delivery. Changes: - Extract KinesisClientFactory for centralized Kinesis client creation - Extract KinesisBatchSender for shared batch sending logic - Introduce DeliveryAdapter base class defining adapter interface - Create ActiveJobAdapter wrapping existing ActiveJob delivery behavior - Refactor Writer to use delivery_adapter.deliver() instead of direct job enqueueing - Refactor Connection to delegate to delivery_adapter.transaction_connection - Update Engine to use adapter.validate_configuration! instead of detect_queue_adapter! - Update DeliveryJob to use KinesisClientFactory All existing behavior is preserved through the ActiveJobAdapter, which is the default adapter. No breaking changes for existing users.
1 parent 14f5650 commit 9d02236

File tree

16 files changed

+820
-187
lines changed

16 files changed

+820
-187
lines changed

app/jobs/journaled/delivery_job.rb

Lines changed: 1 addition & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@
22

33
module Journaled
44
class DeliveryJob < ApplicationJob
5-
DEFAULT_REGION = 'us-east-1'
6-
75
rescue_from(Aws::Kinesis::Errors::InternalFailure, Aws::Kinesis::Errors::ServiceUnavailable, Aws::Kinesis::Errors::Http503Error) do |e|
86
Rails.logger.error "Kinesis Error - Server Error occurred - #{e.class}"
97
raise KinesisTemporaryFailure
@@ -20,16 +18,6 @@ def perform(*events)
2018
journal! if Journaled.enabled?
2119
end
2220

23-
def kinesis_client_config
24-
{
25-
region: ENV.fetch('AWS_DEFAULT_REGION', DEFAULT_REGION),
26-
retry_limit: 0,
27-
http_idle_timeout: Journaled.http_idle_timeout,
28-
http_open_timeout: Journaled.http_open_timeout,
29-
http_read_timeout: Journaled.http_read_timeout,
30-
}.merge(credentials)
31-
end
32-
3321
private
3422

3523
KinesisRecord = Struct.new(:serialized_event, :partition_key, :stream_name, keyword_init: true) do
@@ -51,42 +39,7 @@ def journal!
5139
end
5240

5341
def kinesis_client
54-
Aws::Kinesis::Client.new(kinesis_client_config)
55-
end
56-
57-
def credentials
58-
if ENV.key?('JOURNALED_IAM_ROLE_ARN')
59-
{
60-
credentials: iam_assume_role_credentials,
61-
}
62-
else
63-
legacy_credentials_hash_if_present
64-
end
65-
end
66-
67-
def legacy_credentials_hash_if_present
68-
if ENV.key?('RUBY_AWS_ACCESS_KEY_ID')
69-
{
70-
access_key_id: ENV.fetch('RUBY_AWS_ACCESS_KEY_ID'),
71-
secret_access_key: ENV.fetch('RUBY_AWS_SECRET_ACCESS_KEY'),
72-
}
73-
else
74-
{}
75-
end
76-
end
77-
78-
def sts_client
79-
Aws::STS::Client.new({
80-
region: ENV.fetch('AWS_DEFAULT_REGION', DEFAULT_REGION),
81-
}.merge(legacy_credentials_hash_if_present))
82-
end
83-
84-
def iam_assume_role_credentials
85-
@iam_assume_role_credentials ||= Aws::AssumeRoleCredentials.new(
86-
client: sts_client,
87-
role_arn: ENV.fetch('JOURNALED_IAM_ROLE_ARN'),
88-
role_session_name: "JournaledAssumeRoleAccess",
89-
)
42+
@kinesis_client ||= KinesisClientFactory.build
9043
end
9144

9245
class KinesisTemporaryFailure < NotTrulyExceptionalError

app/models/journaled/writer.rb

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -42,23 +42,13 @@ def self.enqueue!(*events)
4242
events.group_by(&:journaled_enqueue_opts).each do |enqueue_opts, batch|
4343
job_opts = enqueue_opts.reverse_merge(priority: Journaled.job_priority)
4444
ActiveSupport::Notifications.instrument('journaled.batch.enqueue', batch: batch, **job_opts) do
45-
Journaled::DeliveryJob.set(job_opts).perform_later(*delivery_perform_args(batch))
45+
Journaled.delivery_adapter.deliver(events: batch, enqueue_opts: job_opts)
4646

4747
batch.each { |event| ActiveSupport::Notifications.instrument('journaled.event.enqueue', event: event, **job_opts) }
4848
end
4949
end
5050
end
5151

52-
def self.delivery_perform_args(events)
53-
events.map do |event|
54-
{
55-
serialized_event: event.journaled_attributes.to_json,
56-
partition_key: event.journaled_partition_key,
57-
stream_name: event.journaled_stream_name,
58-
}
59-
end
60-
end
61-
6252
private
6353

6454
attr_reader :journaled_event

lib/journaled.rb

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@
88
require "journaled/current"
99
require "journaled/errors"
1010
require 'journaled/connection'
11+
require 'journaled/delivery_adapter'
12+
require 'journaled/delivery_adapters/active_job_adapter'
13+
require 'journaled/kinesis_client_factory'
14+
require 'journaled/kinesis_batch_sender'
1115

1216
module Journaled
1317
SUPPORTED_QUEUE_ADAPTERS = %w(delayed delayed_job good_job que).freeze
@@ -18,6 +22,7 @@ module Journaled
1822
mattr_accessor(:http_open_timeout) { 2 }
1923
mattr_accessor(:http_read_timeout) { 60 }
2024
mattr_accessor(:job_base_class_name) { 'ActiveJob::Base' }
25+
mattr_accessor(:delivery_adapter) { Journaled::DeliveryAdapters::ActiveJobAdapter }
2126
mattr_writer(:transactional_batching_enabled) { true }
2227

2328
def self.transactional_batching_enabled?
@@ -56,21 +61,6 @@ def self.queue_adapter
5661
job_base_class_name.constantize.queue_adapter_name
5762
end
5863

59-
def self.detect_queue_adapter!
60-
unless SUPPORTED_QUEUE_ADAPTERS.include?(queue_adapter)
61-
raise <<~MSG
62-
Journaled has detected an unsupported ActiveJob queue adapter: `:#{queue_adapter}`
63-
64-
Journaled jobs must be enqueued transactionally to your primary database.
65-
66-
Please install the appropriate gems and set `queue_adapter` to one of the following:
67-
#{SUPPORTED_QUEUE_ADAPTERS.map { |a| "- `:#{a}`" }.join("\n")}
68-
69-
Read more at https://github.com/Betterment/journaled
70-
MSG
71-
end
72-
end
73-
7464
def self.tagged(**tags)
7565
existing_tags = Current.tags
7666
tag!(**tags)

lib/journaled/connection.rb

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,7 @@ def transaction_open?
2222
end
2323

2424
def connection
25-
if Journaled.queue_adapter.in? %w(delayed delayed_job)
26-
Delayed::Job.connection
27-
elsif Journaled.queue_adapter == 'good_job'
28-
GoodJob::BaseRecord.connection
29-
elsif Journaled.queue_adapter == 'que'
30-
Que::ActiveRecord::Model.connection
31-
elsif Journaled.queue_adapter == 'test' && Rails.env.test?
32-
ActiveRecord::Base.connection
33-
else
34-
raise "Unsupported adapter: #{Journaled.queue_adapter}"
35-
end
25+
Journaled.delivery_adapter.transaction_connection
3626
end
3727
end
3828

lib/journaled/delivery_adapter.rb

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
# frozen_string_literal: true
2+
3+
module Journaled
4+
# Base class for delivery adapters
5+
#
6+
# Journaled ships with two delivery adapters:
7+
# - Journaled::DeliveryAdapters::ActiveJobAdapter (default) - delivers via ActiveJob
8+
# - Journaled::Outbox::Adapter - delivers via database-backed workers
9+
#
10+
class DeliveryAdapter
11+
# Delivers a batch of events
12+
#
13+
# @param events [Array] Array of journaled events to deliver
14+
# @param enqueue_opts [Hash] Options for delivery (priority, queue, wait, wait_until, etc.)
15+
# @return [void]
16+
def self.deliver(events:, enqueue_opts:) # rubocop:disable Lint/UnusedMethodArgument
17+
raise NoMethodError, "#{name} must implement .deliver(events:, enqueue_opts:)"
18+
end
19+
20+
# Returns the database connection to use for transactional batching
21+
#
22+
# This allows delivery adapters to specify which database connection should be used
23+
# when staging events during a transaction. This is only needed if you want to support
24+
# transactional batching with your adapter.
25+
#
26+
# @return [ActiveRecord::ConnectionAdapters::AbstractAdapter]
27+
def self.transaction_connection
28+
raise NoMethodError, "#{name} must implement .transaction_connection"
29+
end
30+
31+
# Validates that the adapter is properly configured
32+
#
33+
# Called during Rails initialization in production mode. Raise an error if the adapter
34+
# is not configured correctly (e.g., missing required dependencies, invalid configuration).
35+
#
36+
# @return [void]
37+
def self.validate_configuration!
38+
# Default: no validation required
39+
end
40+
end
41+
end
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
# frozen_string_literal: true
2+
3+
module Journaled
4+
module DeliveryAdapters
5+
# Default delivery adapter that uses ActiveJob
6+
#
7+
# This adapter enqueues events to Journaled::DeliveryJob which
8+
# sends them to Kinesis. This is the default behavior and maintains
9+
# backward compatibility with previous versions of the gem.
10+
class ActiveJobAdapter < Journaled::DeliveryAdapter
11+
# Delivers events by enqueueing them to Journaled::DeliveryJob
12+
#
13+
# @param events [Array] Array of journaled events to deliver
14+
# @param enqueue_opts [Hash] Options for ActiveJob (priority, queue, wait, wait_until, etc.)
15+
# @return [void]
16+
def self.deliver(events:, enqueue_opts:)
17+
Journaled::DeliveryJob.set(enqueue_opts).perform_later(*delivery_perform_args(events))
18+
end
19+
20+
# Serializes events into the format expected by DeliveryJob
21+
#
22+
# @param events [Array] Array of journaled events
23+
# @return [Array<Hash>] Array of serialized event hashes
24+
def self.delivery_perform_args(events)
25+
events.map do |event|
26+
{
27+
serialized_event: event.journaled_attributes.to_json,
28+
partition_key: event.journaled_partition_key,
29+
stream_name: event.journaled_stream_name,
30+
}
31+
end
32+
end
33+
34+
# Returns the database connection to use for transactional batching
35+
#
36+
# This is determined by the configured queue adapter, since ActiveJob
37+
# enqueues jobs to the same database that should be used for transactions.
38+
#
39+
# @return [ActiveRecord::ConnectionAdapters::AbstractAdapter] The connection to use
40+
def self.transaction_connection
41+
queue_adapter = Journaled.queue_adapter
42+
43+
if queue_adapter.in? %w(delayed delayed_job)
44+
Delayed::Job.connection
45+
elsif queue_adapter == 'good_job'
46+
GoodJob::BaseRecord.connection
47+
elsif queue_adapter == 'que'
48+
Que::ActiveRecord::Model.connection
49+
elsif queue_adapter == 'test' && Rails.env.test?
50+
ActiveRecord::Base.connection
51+
else
52+
raise "Unsupported queue adapter: #{queue_adapter}"
53+
end
54+
end
55+
56+
# Validates that a supported queue adapter is configured
57+
#
58+
# @return [void]
59+
def self.validate_configuration!
60+
unless Journaled::SUPPORTED_QUEUE_ADAPTERS.include?(Journaled.queue_adapter)
61+
raise <<~MSG
62+
Journaled has detected an unsupported ActiveJob queue adapter: `:#{Journaled.queue_adapter}`
63+
64+
Journaled jobs must be enqueued transactionally to your primary database.
65+
66+
Please install the appropriate gems and set `queue_adapter` to one of the following:
67+
#{Journaled::SUPPORTED_QUEUE_ADAPTERS.map { |a| "- `:#{a}`" }.join("\n")}
68+
69+
Read more at https://github.com/Betterment/journaled
70+
MSG
71+
end
72+
end
73+
end
74+
end
75+
end

lib/journaled/engine.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ module Journaled
44
class Engine < ::Rails::Engine
55
config.after_initialize do
66
ActiveSupport.on_load(:active_job) do
7-
Journaled.detect_queue_adapter! unless Journaled.development_or_test?
7+
Journaled.delivery_adapter.validate_configuration! unless Journaled.development_or_test?
88
end
99

1010
ActiveSupport.on_load(:active_record) do

0 commit comments

Comments
 (0)