Skip to content

Commit e25b6c6

Browse files
committed
Add database-backed event processing with Outbox pattern
Introduce an optional Outbox pattern implementation as an alternative to ActiveJob-based delivery. Events can now be stored in database tables and processed by background workers, providing better reliability and observability for event delivery. This implementation includes: Models & Storage: - Outbox::Base model with common validations - Outbox::Event model with distributed locking for concurrent workers - Outbox::DeadLetter model for failed events with requeue support - Migration generator for creating required database tables Processing Infrastructure: - Outbox::Worker daemon with graceful shutdown and signal handling - Outbox::BatchProcessor with retry logic and dead letter queue - Outbox::Adapter implementing the DeliveryAdapter interface - Rake task for running workers (rake journaled:work) Configuration & Documentation: - New configuration options: outbox_base_class_name, worker_batch_size, worker_poll_interval, worker_lock_timeout, worker_max_attempts - Comprehensive README documentation with setup instructions - Examples for monitoring and DLQ handling Testing: - Complete test coverage for all Outbox components - Integration tests for end-to-end event processing - Test database schema with Outbox tables The default behavior remains unchanged (ActiveJobAdapter). Users can opt into database-backed processing by configuring the Outbox::Adapter.
1 parent 2df7338 commit e25b6c6

File tree

19 files changed

+2001
-3
lines changed

19 files changed

+2001
-3
lines changed

README.md

Lines changed: 138 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,11 @@ durable, eventually consistent record that discrete events happened.
2222

2323
## Installation
2424

25-
1. If you haven't already,
26-
[configure ActiveJob](https://guides.rubyonrails.org/active_job_basics.html)
27-
to use one of the following queue adapters:
25+
1. **Configure a queue adapter** (only required if using the default ActiveJob delivery adapter):
26+
27+
If you haven't already,
28+
[configure ActiveJob](https://guides.rubyonrails.org/active_job_basics.html)
29+
to use one of the following queue adapters:
2830

2931
- `:delayed_job` (via `delayed_job_active_record`)
3032
- `:que`
@@ -52,6 +54,8 @@ to use one of the following queue adapters:
5254
5355
This configuration isn't necessary for applications running Rails 8+.
5456

57+
**Note:** If you plan to use the [Database-Backed Event Processing](#database-backed-event-processing-optional) (Outbox adapter), you can skip this step entirely, as the Outbox adapter does not use ActiveJob.
58+
5559
2. To integrate Journaled into your application, simply include the gem in your
5660
app's Gemfile.
5761
@@ -129,6 +133,37 @@ Journaling provides a number of different configuation options that can be set i
129133
130134
The number of seconds before the :http_handler should timeout while waiting for a HTTP response.
131135
136+
#### `Journaled.delivery_adapter` (default: `Journaled::DeliveryAdapters::ActiveJobAdapter`)
137+
138+
Determines how events are delivered to Kinesis. Two options are available:
139+
140+
- **`Journaled::DeliveryAdapters::ActiveJobAdapter`** (default) - Enqueues events to ActiveJob. Requires a DB-backed queue adapter (see Installation).
141+
142+
- **`Journaled::Outbox::Adapter`** - Stores events in a database table and processes them via separate worker daemons. See [Database-Backed Event Processing](#database-backed-event-processing-optional) for setup instructions.
143+
144+
Example:
145+
```ruby
146+
# Use the database-backed Outbox adapter
147+
Journaled.delivery_adapter = Journaled::Outbox::Adapter
148+
```
149+
150+
#### `Journaled.outbox_base_class_name` (default: `'ActiveRecord::Base'`)
151+
152+
**Only relevant when using `Journaled::Outbox::Adapter`.**
153+
154+
Specifies which ActiveRecord base class the Outbox event storage models (`Journaled::Outbox::Event` and `Journaled::Outbox::DeadLetter`) should use for their database connection. This is useful for multi-database setups where you want to store events in a separate database.
155+
156+
Example:
157+
```ruby
158+
# Store outbox events in a separate database
159+
class EventsRecord < ActiveRecord::Base
160+
self.abstract_class = true
161+
connects_to database: { writing: :events, reading: :events }
162+
end
163+
164+
Journaled.outbox_base_class_name = 'EventsRecord'
165+
```
166+
132167
#### ActiveJob `set` options
133168
134169
Both model-level directives accept additional options to be passed into ActiveJob's `set` method:
@@ -143,6 +178,106 @@ has_audit_log enqueue_with: { priority: 30 }
143178
# Or for custom journaling:
144179
journal_attributes :email, enqueue_with: { priority: 20, queue: 'journaled' }
145180
```
181+
##### Database-Backed Event Processing (Optional)
182+
183+
Journaled includes a built-in database-backed delivery adapter with horizontally scalable workers. This is useful if you want to:
184+
185+
- **Decouple from Delayed Job/ActiveJob**: Process events independently of your existing job queue
186+
- **Horizontal scaling**: Run multiple worker daemons across different servers/containers
187+
- **Batch API efficiency**: Use Kinesis `put_records` (batch) API instead of `put_record` (single), reducing API calls by up to 500x
188+
- **Custom retry logic**: Fine-tune retry behavior and dead letter queue handling
189+
- **Visibility**: Events are stored in database tables where you can inspect, query, and monitor them
190+
191+
**Setup:**
192+
193+
This feature requires creating database tables and is completely optional. Existing users are unaffected.
194+
195+
1. **Generate migrations:**
196+
197+
```bash
198+
rails generate journaled:database_events
199+
rails db:migrate
200+
```
201+
202+
This creates two tables:
203+
- `journaled_events` - Queue of events to be processed
204+
- `journaled_dead_letter_events` - Failed events after max retries
205+
206+
2. **Configure to use the database adapter:**
207+
208+
```ruby
209+
# config/initializers/journaled.rb
210+
211+
# Use the database-backed Outbox adapter instead of ActiveJob
212+
Journaled.delivery_adapter = Journaled::Outbox::Adapter
213+
214+
# Optional: Customize worker behavior (these are the defaults)
215+
Journaled.worker_batch_size = 500 # Max events per Kinesis batch (Kinesis API limit)
216+
Journaled.worker_poll_interval = 5 # Seconds between polls
217+
Journaled.worker_lock_timeout = 300 # Seconds before stale lock can be reclaimed
218+
Journaled.worker_max_attempts = 3 # Retries before moving to DLQ
219+
```
220+
221+
**Note:** When using the Outbox adapter, you do **not** need to configure an ActiveJob queue adapter (skip step 1 of Installation). The Outbox adapter uses the `journaled_events` table for event storage and its own worker daemons for processing, making it independent of ActiveJob. Transactional batching still works seamlessly with the Outbox adapter.
222+
223+
3. **Start worker daemon(s):**
224+
225+
```bash
226+
bundle exec rake journaled_worker:work
227+
```
228+
229+
4. **Monitoring:**
230+
231+
The system emits `ActiveSupport::Notifications` events:
232+
233+
```ruby
234+
# config/initializers/journaled.rb
235+
ActiveSupport::Notifications.subscribe('journaled.worker.batch_sent') do |name, start, finish, id, payload|
236+
Statsd.increment('journaled.worker.events_sent', payload[:succeeded])
237+
end
238+
239+
ActiveSupport::Notifications.subscribe('journaled.worker.batch_failed') do |name, start, finish, id, payload|
240+
Statsd.increment('journaled.worker.events_failed', payload[:failed])
241+
end
242+
```
243+
244+
5. **Dead Letter Queue:**
245+
246+
Inspect and requeue failed events:
247+
248+
```ruby
249+
# Find failed events
250+
Journaled::Outbox::DeadLetter.where(stream_name: 'my_stream')
251+
Journaled::Outbox::DeadLetter.failed_since(1.hour.ago)
252+
253+
# Requeue an event (creates new pending event, removes from DLQ)
254+
dlq_event = Journaled::Outbox::DeadLetter.find(123)
255+
dlq_event.requeue!
256+
```
257+
258+
6. **Production Deployment:**
259+
260+
Deploy workers as separate services (systemd, Docker, Kubernetes, etc.):
261+
262+
```yaml
263+
# Example Kubernetes deployment
264+
apiVersion: apps/v1
265+
kind: Deployment
266+
metadata:
267+
name: journaled-worker
268+
spec:
269+
replicas: 3
270+
template:
271+
spec:
272+
containers:
273+
- name: worker
274+
image: myapp:latest
275+
command: ["bundle", "exec", "rake", "journaled_worker:work"]
276+
env:
277+
- name: AWS_DEFAULT_REGION
278+
value: "us-east-1"
279+
# ... other env vars
280+
```
146281

147282
### Attribution
148283

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
# frozen_string_literal: true
2+
3+
module Journaled
4+
module Outbox
5+
# Provides common validations and scopes for event storage models.
6+
class Base < Journaled.outbox_base_class_name.constantize
7+
self.abstract_class = true
8+
9+
attribute :event_data, :json
10+
11+
validates :event_type, :event_data, :partition_key, :stream_name, presence: true
12+
validates :attempts, numericality: { only_integer: true, greater_than_or_equal_to: 0 }
13+
end
14+
end
15+
end
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
# frozen_string_literal: true
2+
3+
module Journaled
4+
module Outbox
5+
# ActiveRecord model for failed events that have been moved to the dead letter queue
6+
#
7+
# Events end up here after reaching max retry attempts or encountering permanent failures.
8+
# These can be inspected manually and potentially requeued for processing.
9+
class DeadLetter < Base
10+
self.table_name = 'journaled_dead_letter_events'
11+
12+
validates :failure_reason, :failed_at, presence: true
13+
14+
scope :failed_since, ->(time) { where('failed_at >= ?', time) }
15+
16+
# Requeue this event for processing
17+
#
18+
# Creates a new Event with the same attributes and removes this DLQ entry
19+
#
20+
# @return [Journaled::Outbox::Event] The newly created event
21+
def requeue!
22+
transaction do
23+
new_event = Event.create!(
24+
event_type:,
25+
event_data:,
26+
partition_key:,
27+
stream_name:,
28+
attempts: 0,
29+
)
30+
31+
destroy!
32+
33+
new_event
34+
end
35+
end
36+
end
37+
end
38+
end
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
# frozen_string_literal: true
2+
3+
module Journaled
4+
module Outbox
5+
# ActiveRecord model for database-backed event processing
6+
#
7+
# This model is only used when the Outbox::Adapter is configured.
8+
# Events are stored in the database and processed by worker daemons.
9+
#
10+
# Successfully delivered events are deleted immediately.
11+
class Event < Base
12+
self.table_name = 'journaled_events'
13+
14+
scope :ready_to_process, -> {
15+
where('locked_at IS NULL OR locked_at < ?', Time.current - Journaled.worker_lock_timeout.seconds)
16+
.order(:created_at)
17+
}
18+
19+
# Claim a batch of events for processing
20+
#
21+
# Uses SELECT FOR UPDATE SKIP LOCKED for distributed locking across multiple workers
22+
#
23+
# @param worker_id [String] Unique identifier for this worker (e.g., "hostname-pid")
24+
# @return [Array<Journaled::Outbox::Event>] Claimed events
25+
def self.claim_batch!(worker_id:)
26+
if connection.adapter_name == 'PostgreSQL'
27+
claim_batch_postgresql!(worker_id:)
28+
else
29+
claim_batch_generic!(worker_id:)
30+
end
31+
end
32+
33+
# Optimized PostgreSQL implementation using CTE with UPDATE ... FROM
34+
def self.claim_batch_postgresql!(worker_id:)
35+
timeout = Time.current - Journaled.worker_lock_timeout.seconds
36+
batch_size = Journaled.worker_batch_size
37+
locked_at = Time.current
38+
39+
sql = <<-SQL.squish
40+
WITH locked_events AS (
41+
SELECT id
42+
FROM journaled_events
43+
WHERE locked_at IS NULL OR locked_at < ?
44+
ORDER BY created_at
45+
LIMIT ?
46+
FOR UPDATE SKIP LOCKED
47+
)
48+
UPDATE journaled_events
49+
SET locked_at = ?,
50+
locked_by = ?,
51+
attempts = attempts + 1
52+
FROM locked_events
53+
WHERE journaled_events.id = locked_events.id
54+
RETURNING journaled_events.*
55+
SQL
56+
57+
find_by_sql([sql, timeout, batch_size, locked_at, worker_id])
58+
end
59+
60+
# Generic implementation for other databases (SQLite, etc.)
61+
def self.claim_batch_generic!(worker_id:)
62+
transaction do
63+
events = ready_to_process
64+
.limit(Journaled.worker_batch_size)
65+
.lock('FOR UPDATE SKIP LOCKED')
66+
.to_a
67+
68+
event_ids = events.map(&:id)
69+
locked_at = Time.current
70+
71+
# Use RETURNING to get updated records in one query
72+
sql = <<-SQL.squish
73+
UPDATE #{table_name}
74+
SET locked_at = ?,
75+
locked_by = ?,
76+
attempts = attempts + 1
77+
WHERE id IN (?)
78+
RETURNING *
79+
SQL
80+
81+
find_by_sql([sql, locked_at, worker_id, event_ids])
82+
end
83+
end
84+
85+
# Mark event as successfully delivered by deleting it
86+
def mark_delivered!
87+
destroy!
88+
end
89+
90+
# Release lock on event (for transient failures, allowing retry)
91+
def release_lock!(error: nil)
92+
update!(
93+
locked_at: nil,
94+
locked_by: nil,
95+
last_error: error&.message,
96+
)
97+
end
98+
99+
# Move event to dead letter queue (for permanent failures)
100+
#
101+
# This happens in a transaction to ensure the event is never lost.
102+
# The event is moved to the DLQ and then deleted in the same transaction.
103+
#
104+
# @param error [Exception] The error that caused the permanent failure
105+
def move_to_dlq!(error:)
106+
transaction do
107+
DeadLetter.create!(
108+
event_type:,
109+
event_data:,
110+
partition_key:,
111+
stream_name:,
112+
attempts:,
113+
failure_reason: "#{error.class}: #{error.message}",
114+
failed_at: Time.current,
115+
originally_created_at: created_at,
116+
)
117+
118+
# Delete the event so it won't be processed again
119+
destroy!
120+
end
121+
end
122+
123+
# Check if event has reached maximum retry attempts
124+
def max_attempts_reached?
125+
attempts >= Journaled.worker_max_attempts
126+
end
127+
end
128+
end
129+
end
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
# frozen_string_literal: true
2+
3+
require 'rails/generators'
4+
require 'rails/generators/active_record'
5+
6+
module Journaled
7+
module Generators
8+
class DatabaseEventsGenerator < Rails::Generators::Base
9+
include ActiveRecord::Generators::Migration
10+
11+
source_root File.expand_path('templates', __dir__)
12+
13+
desc "Generates migrations for journaled database-backed event processing"
14+
15+
def create_journaled_events_migration
16+
migration_template(
17+
"create_journaled_events.rb.erb",
18+
"db/migrate/create_journaled_events.rb",
19+
migration_version:,
20+
)
21+
end
22+
23+
def create_journaled_dead_letter_events_migration
24+
migration_template(
25+
"create_journaled_dead_letter_events.rb.erb",
26+
"db/migrate/create_journaled_dead_letter_events.rb",
27+
migration_version:,
28+
)
29+
end
30+
31+
def show_readme
32+
readme "README" if behavior == :invoke
33+
end
34+
35+
private
36+
37+
def migration_version
38+
"[#{ActiveRecord::VERSION::MAJOR}.#{ActiveRecord::VERSION::MINOR}]"
39+
end
40+
end
41+
end
42+
end

0 commit comments

Comments
 (0)