Skip to content

Commit 07941e4

Browse files
committed
Add database-backed event processing with Outbox pattern
This commit introduces a database-backed outbox pattern for reliable event processing as an alternative to direct Kinesis delivery. This enables transactional event guarantees and better reliability. Key changes: - Add Journaled::Outbox::Adapter for database-backed event delivery - Add Journaled::Outbox::Event model with UUID v7 primary keys - Add Journaled::Outbox::Worker for processing events from the database - Add Journaled::Outbox::BatchProcessor for batch delivery to Kinesis - Add Journaled::Outbox::MetricEmitter for monitoring queue health - Add generators for creating database tables and UUID v7 support - Add rake task for running the worker process - Update KinesisBatchSender to work with database events - Add comprehensive test coverage for all outbox components The outbox pattern stores events in PostgreSQL with guaranteed ordering using UUID v7 timestamps, then processes them asynchronously with configurable retry logic and detailed metrics emission.
1 parent 2df7338 commit 07941e4

File tree

21 files changed

+1823
-178
lines changed

21 files changed

+1823
-178
lines changed

README.md

Lines changed: 159 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,11 @@ durable, eventually consistent record that discrete events happened.
2222

2323
## Installation
2424

25-
1. If you haven't already,
26-
[configure ActiveJob](https://guides.rubyonrails.org/active_job_basics.html)
27-
to use one of the following queue adapters:
25+
1. **Configure a queue adapter** (only required if using the default ActiveJob delivery adapter):
26+
27+
If you haven't already,
28+
[configure ActiveJob](https://guides.rubyonrails.org/active_job_basics.html)
29+
to use one of the following queue adapters:
2830

2931
- `:delayed_job` (via `delayed_job_active_record`)
3032
- `:que`
@@ -52,6 +54,8 @@ to use one of the following queue adapters:
5254
5355
This configuration isn't necessary for applications running Rails 8+.
5456

57+
**Note:** If you plan to use the [Database-Backed Event Processing](#database-backed-event-processing-optional) (Outbox adapter), you can skip this step entirely, as the Outbox adapter does not use ActiveJob.
58+
5559
2. To integrate Journaled into your application, simply include the gem in your
5660
app's Gemfile.
5761
@@ -129,6 +133,37 @@ Journaling provides a number of different configuation options that can be set i
129133
130134
The number of seconds before the :http_handler should timeout while waiting for a HTTP response.
131135
136+
#### `Journaled.delivery_adapter` (default: `Journaled::DeliveryAdapters::ActiveJobAdapter`)
137+
138+
Determines how events are delivered to Kinesis. Two options are available:
139+
140+
- **`Journaled::DeliveryAdapters::ActiveJobAdapter`** (default) - Enqueues events to ActiveJob. Requires a DB-backed queue adapter (see Installation).
141+
142+
- **`Journaled::Outbox::Adapter`** - Stores events in a database table and processes them via separate worker daemons. See [Database-Backed Event Processing](#database-backed-event-processing-optional) for setup instructions.
143+
144+
Example:
145+
```ruby
146+
# Use the database-backed Outbox adapter
147+
Journaled.delivery_adapter = Journaled::Outbox::Adapter
148+
```
149+
150+
#### `Journaled.outbox_base_class_name` (default: `'ActiveRecord::Base'`)
151+
152+
**Only relevant when using `Journaled::Outbox::Adapter`.**
153+
154+
Specifies which ActiveRecord base class the Outbox event storage model (`Journaled::Outbox::Event`) should use for its database connection. This is useful for multi-database setups where you want to store events in a separate database.
155+
156+
Example:
157+
```ruby
158+
# Store outbox events in a separate database
159+
class EventsRecord < ActiveRecord::Base
160+
self.abstract_class = true
161+
connects_to database: { writing: :events, reading: :events }
162+
end
163+
164+
Journaled.outbox_base_class_name = 'EventsRecord'
165+
```
166+
132167
#### ActiveJob `set` options
133168
134169
Both model-level directives accept additional options to be passed into ActiveJob's `set` method:
@@ -143,6 +178,127 @@ has_audit_log enqueue_with: { priority: 30 }
143178
# Or for custom journaling:
144179
journal_attributes :email, enqueue_with: { priority: 20, queue: 'journaled' }
145180
```
181+
##### Database-Backed Event Processing (Optional)
182+
183+
Journaled includes a built-in database-backed delivery adapter with horizontally scalable workers.
184+
185+
**Setup:**
186+
187+
This feature requires creating database tables and is completely optional. Existing users are unaffected.
188+
189+
1. **Generate migrations:**
190+
191+
```bash
192+
rails generate journaled:database_events
193+
rails db:migrate
194+
```
195+
196+
This creates a table for storing events:
197+
- `journaled_outbox_events` - Queue of events to be processed (includes `failed_at` column for tracking failures)
198+
199+
2. **Configure to use the database adapter:**
200+
201+
```ruby
202+
# config/initializers/journaled.rb
203+
204+
# Use the database-backed Outbox adapter instead of ActiveJob
205+
Journaled.delivery_adapter = Journaled::Outbox::Adapter
206+
207+
# Optional: Customize worker behavior (these are the defaults)
208+
Journaled.worker_batch_size = 500 # Max events per Kinesis batch (Kinesis API limit)
209+
Journaled.worker_poll_interval = 5 # Seconds between polls
210+
```
211+
212+
**Note:** When using the Outbox adapter, you do **not** need to configure an ActiveJob queue adapter (skip step 1 of Installation). The Outbox adapter uses the `journaled_outbox_events` table for event storage and its own worker daemons for processing, making it independent of ActiveJob. Transactional batching still works seamlessly with the Outbox adapter.
213+
214+
3. **Start worker daemon(s):**
215+
216+
```bash
217+
bundle exec rake journaled_worker:work
218+
```
219+
220+
4. **Monitoring:**
221+
222+
The system emits `ActiveSupport::Notifications` events:
223+
224+
```ruby
225+
# config/initializers/journaled.rb
226+
227+
# Emitted for every batch processed (regardless of outcome)
228+
ActiveSupport::Notifications.subscribe('journaled.worker.batch_process') do |name, start, finish, id, payload|
229+
Statsd.increment('journaled.worker.batches', tags: ["worker:#{payload[:worker_id]}"])
230+
end
231+
232+
# Emitted for successfully sent events
233+
ActiveSupport::Notifications.subscribe('journaled.worker.batch_sent') do |name, start, finish, id, payload|
234+
Statsd.increment('journaled.worker.events_sent', payload[:event_count], tags: ["worker:#{payload[:worker_id]}"])
235+
end
236+
237+
# Emitted for permanently failed events (marked as failed in database)
238+
ActiveSupport::Notifications.subscribe('journaled.worker.batch_failed') do |name, start, finish, id, payload|
239+
Statsd.increment('journaled.worker.events_failed', payload[:event_count], tags: ["worker:#{payload[:worker_id]}"])
240+
end
241+
242+
# Emitted for transiently failed events (will be retried)
243+
ActiveSupport::Notifications.subscribe('journaled.worker.batch_errored') do |name, start, finish, id, payload|
244+
Statsd.increment('journaled.worker.events_errored', payload[:event_count], tags: ["worker:#{payload[:worker_id]}"])
245+
end
246+
247+
# Emitted once per minute with queue statistics
248+
ActiveSupport::Notifications.subscribe('journaled.worker.queue_metrics') do |name, start, finish, id, payload|
249+
Statsd.gauge('journaled.worker.queue.total', payload[:total_count], tags: ["worker:#{payload[:worker_id]}"])
250+
Statsd.gauge('journaled.worker.queue.workable', payload[:workable_count], tags: ["worker:#{payload[:worker_id]}"])
251+
Statsd.gauge('journaled.worker.queue.erroring', payload[:erroring_count], tags: ["worker:#{payload[:worker_id]}"])
252+
Statsd.gauge('journaled.worker.queue.oldest_age_seconds', payload[:oldest_age_seconds], tags: ["worker:#{payload[:worker_id]}"]) if payload[:oldest_age_seconds]
253+
end
254+
```
255+
256+
Queue metrics payload includes:
257+
- `total_count` - Total number of events in the queue (including failed)
258+
- `workable_count` - Events ready to be processed (not failed)
259+
- `erroring_count` - Events with errors but not yet marked as permanently failed
260+
- `oldest_non_failed_timestamp` - Timestamp of the oldest non-failed event (extracted from UUID v7)
261+
- `oldest_age_seconds` - Age in seconds of the oldest non-failed event
262+
263+
Note: Metrics are collected in a background thread to avoid blocking the main worker loop.
264+
265+
5. **Failed Events:**
266+
267+
Inspect and requeue failed events:
268+
269+
```ruby
270+
# Find failed events
271+
Journaled::Outbox::Event.failed.where(stream_name: 'my_stream')
272+
Journaled::Outbox::Event.failed_since(1.hour.ago)
273+
274+
# Requeue a failed event (clears failure info and resets attempts)
275+
failed_event = Journaled::Outbox::Event.failed.find(123)
276+
failed_event.requeue!
277+
```
278+
279+
6. **Production Deployment:**
280+
281+
Deploy workers as separate services (systemd, Docker, Kubernetes, etc.):
282+
283+
```yaml
284+
# Example Kubernetes deployment
285+
apiVersion: apps/v1
286+
kind: Deployment
287+
metadata:
288+
name: journaled-worker
289+
spec:
290+
replicas: 3
291+
template:
292+
spec:
293+
containers:
294+
- name: worker
295+
image: myapp:latest
296+
command: ["bundle", "exec", "rake", "journaled_worker:work"]
297+
env:
298+
- name: AWS_DEFAULT_REGION
299+
value: "us-east-1"
300+
# ... other env vars
301+
```
146302

147303
### Attribution
148304

app/models/journaled/event.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ def event_type
1818
end
1919

2020
def created_at
21-
@created_at ||= Time.zone.now
21+
@created_at ||= Time.current
2222
end
2323

2424
# Event metadata and configuration (not serialized)
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
# frozen_string_literal: true
2+
3+
module Journaled
4+
module Outbox
5+
# ActiveRecord model for database-backed event processing
6+
#
7+
# This model is only used when the Outbox::Adapter is configured.
8+
# Events are stored in the database and processed by worker daemons.
9+
#
10+
# Successfully delivered events are deleted immediately.
11+
# Failed events are marked with failed_at and can be queried or requeued.
12+
class Event < Journaled.outbox_base_class_name.constantize
13+
self.table_name = 'journaled_outbox_events'
14+
15+
skip_audit_log
16+
17+
attribute :event_data, :json
18+
19+
validates :event_type, :event_data, :partition_key, :stream_name, presence: true
20+
21+
scope :ready_to_process, -> {
22+
where(failed_at: nil)
23+
.order(:id)
24+
}
25+
26+
scope :failed, -> { where.not(failed_at: nil) }
27+
scope :failed_since, ->(time) { where('failed_at >= ?', time) }
28+
29+
# Fetch a batch of events for processing using SELECT FOR UPDATE
30+
#
31+
# @return [Array<Journaled::Outbox::Event>] Events locked for processing
32+
def self.fetch_batch_for_update
33+
ready_to_process
34+
.limit(Journaled.worker_batch_size)
35+
.lock('FOR UPDATE')
36+
.to_a
37+
end
38+
39+
# Requeue a failed event for processing
40+
#
41+
# Clears failure information so the event can be retried.
42+
#
43+
# @return [Boolean] Whether the requeue was successful
44+
def requeue!
45+
update!(
46+
failed_at: nil,
47+
failure_reason: nil,
48+
)
49+
end
50+
51+
# Extract timestamp from UUID v7 id
52+
#
53+
# UUID v7 embeds a timestamp in the first 48 bits (milliseconds since Unix epoch)
54+
#
55+
# @return [Time] The timestamp embedded in the UUID
56+
def self.timestamp_from_uuid(uuid)
57+
# Remove dashes and take first 12 hex characters (48 bits)
58+
hex_timestamp = uuid.to_s.delete('-')[0, 12]
59+
# Convert from hex to milliseconds since epoch
60+
milliseconds = hex_timestamp.to_i(16)
61+
# Convert to Time object
62+
Time.zone.at(milliseconds / 1000.0)
63+
end
64+
65+
# Get the oldest non-failed event's timestamp
66+
#
67+
# @return [Time, nil] The timestamp of the oldest event, or nil if no events exist
68+
def self.oldest_non_failed_timestamp
69+
oldest = ready_to_process.order(:id).limit(1).pick(:id)
70+
return nil unless oldest
71+
72+
timestamp_from_uuid(oldest)
73+
end
74+
end
75+
end
76+
end
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
# frozen_string_literal: true
2+
3+
require 'rails/generators'
4+
require 'rails/generators/active_record'
5+
6+
module Journaled
7+
module Generators
8+
class DatabaseEventsGenerator < Rails::Generators::Base
9+
include ActiveRecord::Generators::Migration
10+
11+
source_root File.expand_path('templates', __dir__)
12+
13+
desc "Generates migration for journaled database-backed event processing"
14+
15+
def install_uuid_generate_v7_migration
16+
migration_template(
17+
"install_uuid_generate_v7.rb.erb",
18+
"db/migrate/install_uuid_generate_v7.rb",
19+
migration_version:,
20+
)
21+
end
22+
23+
def create_journaled_events_migration
24+
migration_template(
25+
"create_journaled_events.rb.erb",
26+
"db/migrate/create_journaled_events.rb",
27+
migration_version:,
28+
)
29+
end
30+
31+
def show_readme
32+
readme "README" if behavior == :invoke
33+
end
34+
35+
private
36+
37+
def migration_version
38+
"[#{ActiveRecord::VERSION::MAJOR}.#{ActiveRecord::VERSION::MINOR}]"
39+
end
40+
end
41+
end
42+
end
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
===============================================================================
2+
3+
Database-backed event processing migrations have been generated!
4+
5+
Next steps:
6+
7+
1. Run migrations:
8+
9+
rails db:migrate
10+
11+
2. Configure Journaled to use the outbox adapter in an initializer:
12+
13+
# config/initializers/journaled.rb
14+
Journaled.delivery_adapter = Journaled::Outbox::Adapter
15+
16+
# Optional configuration:
17+
Journaled.worker_batch_size = 500 # default: 500 (Kinesis max)
18+
Journaled.worker_poll_interval = 5 # default: 5 seconds
19+
20+
3. Start the worker daemon:
21+
22+
bundle exec rake journaled_worker:work
23+
24+
4. (Optional) For horizontal scaling, run multiple workers on different
25+
servers/containers. They will coordinate using database locks.
26+
27+
For more information, see the README:
28+
https://github.com/Betterment/journaled#database-backed-event-processing-optional
29+
30+
===============================================================================
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
class CreateJournaledEvents < ActiveRecord::Migration<%= migration_version %>
2+
def change
3+
# UUID v7 primary key (auto-generated by database using uuid_generate_v7())
4+
create_table :journaled_outbox_events, id: :uuid, default: -> { "uuid_generate_v7()" } do |t|
5+
# Event identification and data
6+
t.string :event_type, null: false
7+
t.jsonb :event_data, null: false
8+
t.string :partition_key, null: false
9+
t.string :stream_name, null: false
10+
11+
t.text :failure_reason
12+
13+
t.datetime :failed_at
14+
end
15+
16+
# Index for querying failed events
17+
add_index :journaled_outbox_events, :failed_at
18+
end
19+
end
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
class InstallUuidGenerateV7 < ActiveRecord::Migration<%= migration_version %>
2+
def up
3+
# Enable pgcrypto extension for gen_random_bytes()
4+
enable_extension 'pgcrypto'
5+
6+
# Install UUID v7 generation function
7+
# Source: https://github.com/Betterment/postgresql-uuid-generate-v7
8+
execute <<-SQL
9+
CREATE OR REPLACE FUNCTION uuid_generate_v7()
10+
RETURNS uuid
11+
LANGUAGE plpgsql
12+
PARALLEL SAFE
13+
AS $$
14+
DECLARE
15+
unix_time_ms CONSTANT bytea NOT NULL DEFAULT substring(int8send((extract(epoch FROM clock_timestamp()) * 1000)::bigint) from 3);
16+
buffer bytea NOT NULL DEFAULT unix_time_ms || gen_random_bytes(10);
17+
BEGIN
18+
buffer = set_byte(buffer, 6, (b'0111' || get_byte(buffer, 6)::bit(4))::bit(8)::int);
19+
buffer = set_byte(buffer, 8, (b'10' || get_byte(buffer, 8)::bit(6))::bit(8)::int);
20+
RETURN encode(buffer, 'hex');
21+
END
22+
$$;
23+
SQL
24+
end
25+
end

0 commit comments

Comments
 (0)