Skip to content

Commit b28d747

Browse files
committed
Add database-backed event processing with Outbox pattern
This commit introduces a database-backed outbox pattern for reliable event processing as an alternative to direct Kinesis delivery. This enables transactional event guarantees and better reliability. Key changes: - Add Journaled::Outbox::Adapter for database-backed event delivery - Add Journaled::Outbox::Event model with UUID v7 primary keys - Add Journaled::Outbox::Worker for processing events from the database - Add Journaled::Outbox::BatchProcessor for batch delivery to Kinesis - Add Journaled::Outbox::MetricEmitter for monitoring queue health - Add generators for creating database tables and UUID v7 support - Add rake task for running the worker process - Update KinesisBatchSender to work with database events - Add comprehensive test coverage for all outbox components The outbox pattern stores events in PostgreSQL with guaranteed ordering using UUID v7 timestamps, then processes them asynchronously with configurable retry logic and detailed metrics emission.
1 parent 2df7338 commit b28d747

File tree

21 files changed

+1951
-156
lines changed

21 files changed

+1951
-156
lines changed

README.md

Lines changed: 166 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,11 @@ durable, eventually consistent record that discrete events happened.
2222

2323
## Installation
2424

25-
1. If you haven't already,
26-
[configure ActiveJob](https://guides.rubyonrails.org/active_job_basics.html)
27-
to use one of the following queue adapters:
25+
1. **Configure a queue adapter** (only required if using the default ActiveJob delivery adapter):
26+
27+
If you haven't already,
28+
[configure ActiveJob](https://guides.rubyonrails.org/active_job_basics.html)
29+
to use one of the following queue adapters:
2830

2931
- `:delayed_job` (via `delayed_job_active_record`)
3032
- `:que`
@@ -52,6 +54,8 @@ to use one of the following queue adapters:
5254
5355
This configuration isn't necessary for applications running Rails 8+.
5456

57+
**Note:** If you plan to use the [Database-Backed Event Processing](#database-backed-event-processing-optional) (Outbox adapter), you can skip this step entirely, as the Outbox adapter does not use ActiveJob.
58+
5559
2. To integrate Journaled into your application, simply include the gem in your
5660
app's Gemfile.
5761
@@ -129,6 +133,37 @@ Journaling provides a number of different configuation options that can be set i
129133
130134
The number of seconds before the :http_handler should timeout while waiting for a HTTP response.
131135
136+
#### `Journaled.delivery_adapter` (default: `Journaled::DeliveryAdapters::ActiveJobAdapter`)
137+
138+
Determines how events are delivered to Kinesis. Two options are available:
139+
140+
- **`Journaled::DeliveryAdapters::ActiveJobAdapter`** (default) - Enqueues events to ActiveJob. Requires a DB-backed queue adapter (see Installation).
141+
142+
- **`Journaled::Outbox::Adapter`** - Stores events in a database table and processes them via separate worker daemons. See [Database-Backed Event Processing](#database-backed-event-processing-optional) for setup instructions.
143+
144+
Example:
145+
```ruby
146+
# Use the database-backed Outbox adapter
147+
Journaled.delivery_adapter = Journaled::Outbox::Adapter
148+
```
149+
150+
#### `Journaled.outbox_base_class_name` (default: `'ActiveRecord::Base'`)
151+
152+
**Only relevant when using `Journaled::Outbox::Adapter`.**
153+
154+
Specifies which ActiveRecord base class the Outbox event storage model (`Journaled::Outbox::Event`) should use for its database connection. This is useful for multi-database setups where you want to store events in a separate database.
155+
156+
Example:
157+
```ruby
158+
# Store outbox events in a separate database
159+
class EventsRecord < ActiveRecord::Base
160+
self.abstract_class = true
161+
connects_to database: { writing: :events, reading: :events }
162+
end
163+
164+
Journaled.outbox_base_class_name = 'EventsRecord'
165+
```
166+
132167
#### ActiveJob `set` options
133168
134169
Both model-level directives accept additional options to be passed into ActiveJob's `set` method:
@@ -143,6 +178,134 @@ has_audit_log enqueue_with: { priority: 30 }
143178
# Or for custom journaling:
144179
journal_attributes :email, enqueue_with: { priority: 20, queue: 'journaled' }
145180
```
181+
##### Database-Backed Event Processing (Optional)
182+
183+
Journaled includes a built-in database-backed delivery adapter with horizontally scalable workers. This is useful if you want to:
184+
185+
- **Decouple from Delayed Job/ActiveJob**: Process events independently of your existing job queue
186+
- **Horizontal scaling**: Run multiple worker daemons across different servers/containers
187+
- **Batch API efficiency**: Use Kinesis `put_records` (batch) API instead of `put_record` (single), reducing API calls by up to 500x
188+
- **Custom retry logic**: Fine-tune retry behavior and failed event handling
189+
- **Visibility**: Events are stored in database tables where you can inspect, query, and monitor them
190+
191+
**Setup:**
192+
193+
This feature requires creating database tables and is completely optional. Existing users are unaffected.
194+
195+
1. **Generate migrations:**
196+
197+
```bash
198+
rails generate journaled:database_events
199+
rails db:migrate
200+
```
201+
202+
This creates a table for storing events:
203+
- `journaled_outbox_events` - Queue of events to be processed (includes `failed_at` column for tracking failures)
204+
205+
2. **Configure to use the database adapter:**
206+
207+
```ruby
208+
# config/initializers/journaled.rb
209+
210+
# Use the database-backed Outbox adapter instead of ActiveJob
211+
Journaled.delivery_adapter = Journaled::Outbox::Adapter
212+
213+
# Optional: Customize worker behavior (these are the defaults)
214+
Journaled.worker_batch_size = 500 # Max events per Kinesis batch (Kinesis API limit)
215+
Journaled.worker_poll_interval = 5 # Seconds between polls
216+
Journaled.worker_max_attempts = 3 # Retries before marking as failed
217+
```
218+
219+
**Note:** When using the Outbox adapter, you do **not** need to configure an ActiveJob queue adapter (skip step 1 of Installation). The Outbox adapter uses the `journaled_outbox_events` table for event storage and its own worker daemons for processing, making it independent of ActiveJob. Transactional batching still works seamlessly with the Outbox adapter.
220+
221+
3. **Start worker daemon(s):**
222+
223+
```bash
224+
bundle exec rake journaled_worker:work
225+
```
226+
227+
4. **Monitoring:**
228+
229+
The system emits `ActiveSupport::Notifications` events:
230+
231+
```ruby
232+
# config/initializers/journaled.rb
233+
234+
# Emitted for every batch processed (regardless of outcome)
235+
ActiveSupport::Notifications.subscribe('journaled.worker.batch_process') do |name, start, finish, id, payload|
236+
Statsd.increment('journaled.worker.batches', tags: ["worker:#{payload[:worker_id]}"])
237+
end
238+
239+
# Emitted for successfully sent events
240+
ActiveSupport::Notifications.subscribe('journaled.worker.batch_sent') do |name, start, finish, id, payload|
241+
Statsd.increment('journaled.worker.events_sent', payload[:event_count], tags: ["worker:#{payload[:worker_id]}"])
242+
end
243+
244+
# Emitted for permanently failed events (marked as failed in database)
245+
ActiveSupport::Notifications.subscribe('journaled.worker.batch_failed') do |name, start, finish, id, payload|
246+
Statsd.increment('journaled.worker.events_failed', payload[:event_count], tags: ["worker:#{payload[:worker_id]}"])
247+
end
248+
249+
# Emitted for transiently failed events (will be retried)
250+
ActiveSupport::Notifications.subscribe('journaled.worker.batch_errored') do |name, start, finish, id, payload|
251+
Statsd.increment('journaled.worker.events_errored', payload[:event_count], tags: ["worker:#{payload[:worker_id]}"])
252+
end
253+
254+
# Emitted once per minute with queue statistics
255+
ActiveSupport::Notifications.subscribe('journaled.worker.queue_metrics') do |name, start, finish, id, payload|
256+
Statsd.gauge('journaled.worker.queue.total', payload[:total_count], tags: ["worker:#{payload[:worker_id]}"])
257+
Statsd.gauge('journaled.worker.queue.workable', payload[:workable_count], tags: ["worker:#{payload[:worker_id]}"])
258+
Statsd.gauge('journaled.worker.queue.erroring', payload[:erroring_count], tags: ["worker:#{payload[:worker_id]}"])
259+
Statsd.gauge('journaled.worker.queue.oldest_age_seconds', payload[:oldest_age_seconds], tags: ["worker:#{payload[:worker_id]}"]) if payload[:oldest_age_seconds]
260+
end
261+
```
262+
263+
Queue metrics payload includes:
264+
- `total_count` - Total number of events in the queue (including failed)
265+
- `workable_count` - Events ready to be processed (not failed)
266+
- `erroring_count` - Events with errors but not yet marked as permanently failed
267+
- `oldest_non_failed_timestamp` - Timestamp of the oldest non-failed event (extracted from UUID v7)
268+
- `oldest_age_seconds` - Age in seconds of the oldest non-failed event
269+
270+
Note: Metrics are collected in a background thread to avoid blocking the main worker loop.
271+
272+
5. **Failed Events:**
273+
274+
Inspect and requeue failed events:
275+
276+
```ruby
277+
# Find failed events
278+
Journaled::Outbox::Event.failed.where(stream_name: 'my_stream')
279+
Journaled::Outbox::Event.failed_since(1.hour.ago)
280+
281+
# Requeue a failed event (clears failure info and resets attempts)
282+
failed_event = Journaled::Outbox::Event.failed.find(123)
283+
failed_event.requeue!
284+
```
285+
286+
6. **Production Deployment:**
287+
288+
Deploy workers as separate services (systemd, Docker, Kubernetes, etc.):
289+
290+
```yaml
291+
# Example Kubernetes deployment
292+
apiVersion: apps/v1
293+
kind: Deployment
294+
metadata:
295+
name: journaled-worker
296+
spec:
297+
replicas: 3
298+
template:
299+
spec:
300+
containers:
301+
- name: worker
302+
image: myapp:latest
303+
command: ["bundle", "exec", "rake", "journaled_worker:work"]
304+
env:
305+
- name: AWS_DEFAULT_REGION
306+
value: "us-east-1"
307+
# ... other env vars
308+
```
146309

147310
### Attribution
148311

app/models/journaled/event.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ def event_type
1818
end
1919

2020
def created_at
21-
@created_at ||= Time.zone.now
21+
@created_at ||= Time.current
2222
end
2323

2424
# Event metadata and configuration (not serialized)
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
# frozen_string_literal: true
2+
3+
module Journaled
4+
module Outbox
5+
# ActiveRecord model for database-backed event processing
6+
#
7+
# This model is only used when the Outbox::Adapter is configured.
8+
# Events are stored in the database and processed by worker daemons.
9+
#
10+
# Successfully delivered events are deleted immediately.
11+
# Failed events are marked with failed_at and can be queried or requeued.
12+
class Event < Journaled.outbox_base_class_name.constantize
13+
self.table_name = 'journaled_outbox_events'
14+
15+
skip_audit_log
16+
17+
attribute :event_data, :json
18+
19+
validates :event_type, :event_data, :partition_key, :stream_name, presence: true
20+
validates :attempts, numericality: { only_integer: true, greater_than_or_equal_to: 0 }
21+
22+
scope :ready_to_process, -> {
23+
where(failed_at: nil)
24+
.order(:id)
25+
}
26+
27+
scope :failed, -> { where.not(failed_at: nil) }
28+
scope :failed_since, ->(time) { where('failed_at >= ?', time) }
29+
30+
# Fetch a batch of events for processing using SELECT FOR UPDATE
31+
#
32+
# @return [Array<Journaled::Outbox::Event>] Events locked for processing
33+
def self.fetch_batch_for_update
34+
ready_to_process
35+
.limit(Journaled.worker_batch_size)
36+
.lock('FOR UPDATE')
37+
.to_a
38+
end
39+
40+
# Requeue a failed event for processing
41+
#
42+
# Clears failure information and resets attempts so the event can be retried.
43+
#
44+
# @return [Boolean] Whether the requeue was successful
45+
def requeue!
46+
update!(
47+
failed_at: nil,
48+
attempts: 0,
49+
last_error: nil,
50+
)
51+
end
52+
53+
# Extract timestamp from UUID v7 id
54+
#
55+
# UUID v7 embeds a timestamp in the first 48 bits (milliseconds since Unix epoch)
56+
#
57+
# @return [Time] The timestamp embedded in the UUID
58+
def self.timestamp_from_uuid(uuid)
59+
# Remove dashes and take first 12 hex characters (48 bits)
60+
hex_timestamp = uuid.to_s.delete('-')[0, 12]
61+
# Convert from hex to milliseconds since epoch
62+
milliseconds = hex_timestamp.to_i(16)
63+
# Convert to Time object
64+
Time.zone.at(milliseconds / 1000.0)
65+
end
66+
67+
# Get the oldest non-failed event's timestamp
68+
#
69+
# @return [Time, nil] The timestamp of the oldest event, or nil if no events exist
70+
def self.oldest_non_failed_timestamp
71+
oldest = ready_to_process.order(:id).limit(1).pick(:id)
72+
return nil unless oldest
73+
74+
timestamp_from_uuid(oldest)
75+
end
76+
end
77+
end
78+
end
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
# frozen_string_literal: true
2+
3+
require 'rails/generators'
4+
require 'rails/generators/active_record'
5+
6+
module Journaled
7+
module Generators
8+
class DatabaseEventsGenerator < Rails::Generators::Base
9+
include ActiveRecord::Generators::Migration
10+
11+
source_root File.expand_path('templates', __dir__)
12+
13+
desc "Generates migration for journaled database-backed event processing"
14+
15+
def install_uuid_generate_v7_migration
16+
migration_template(
17+
"install_uuid_generate_v7.rb.erb",
18+
"db/migrate/install_uuid_generate_v7.rb",
19+
migration_version:,
20+
)
21+
end
22+
23+
def create_journaled_events_migration
24+
migration_template(
25+
"create_journaled_events.rb.erb",
26+
"db/migrate/create_journaled_events.rb",
27+
migration_version:,
28+
)
29+
end
30+
31+
def show_readme
32+
readme "README" if behavior == :invoke
33+
end
34+
35+
private
36+
37+
def migration_version
38+
"[#{ActiveRecord::VERSION::MAJOR}.#{ActiveRecord::VERSION::MINOR}]"
39+
end
40+
end
41+
end
42+
end
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
===============================================================================
2+
3+
Database-backed event processing migrations have been generated!
4+
5+
Next steps:
6+
7+
1. Run migrations:
8+
9+
rails db:migrate
10+
11+
2. Configure Journaled to use the outbox adapter in an initializer:
12+
13+
# config/initializers/journaled.rb
14+
Journaled.delivery_adapter = Journaled::Outbox::Adapter
15+
16+
# Optional configuration:
17+
Journaled.worker_batch_size = 500 # default: 500 (Kinesis max)
18+
Journaled.worker_poll_interval = 5 # default: 5 seconds
19+
Journaled.worker_max_attempts = 3 # default: 3
20+
21+
3. Start the worker daemon:
22+
23+
bundle exec rake journaled_worker:work
24+
25+
4. (Optional) For horizontal scaling, run multiple workers on different
26+
servers/containers. They will coordinate using database locks.
27+
28+
For more information, see the README:
29+
https://github.com/Betterment/journaled#database-backed-event-processing-optional
30+
31+
===============================================================================
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
class CreateJournaledEvents < ActiveRecord::Migration<%= migration_version %>
2+
def change
3+
# UUID v7 primary key (auto-generated by database using uuid_generate_v7())
4+
create_table :journaled_outbox_events, id: :uuid, default: -> { "uuid_generate_v7()" } do |t|
5+
# Event identification and data
6+
t.string :event_type, null: false
7+
t.jsonb :event_data, null: false
8+
t.string :partition_key, null: false
9+
t.string :stream_name, null: false
10+
11+
t.integer :attempts, default: 0, null: false
12+
t.text :last_error
13+
14+
t.datetime :failed_at
15+
end
16+
17+
# Index for querying failed events
18+
add_index :journaled_outbox_events, :failed_at
19+
end
20+
end

0 commit comments

Comments
 (0)