Skip to content

Commit f2f42cd

Browse files
committed
Add database-backed event processing with Outbox pattern
Introduce an optional Outbox pattern implementation as an alternative to ActiveJob-based delivery. Events can now be stored in database tables and processed by background workers, providing better reliability and observability for event delivery. This implementation includes: Models & Storage: - Outbox::Base model with common validations - Outbox::Event model with distributed locking for concurrent workers - Outbox::DeadLetter model for failed events with requeue support - Migration generator for creating required database tables Processing Infrastructure: - Outbox::Worker daemon with graceful shutdown and signal handling - Outbox::BatchProcessor with retry logic and dead letter queue - Outbox::Adapter implementing the DeliveryAdapter interface - Rake task for running workers (rake journaled:work) Configuration & Documentation: - New configuration options: outbox_base_class_name, worker_batch_size, worker_poll_interval, worker_lock_timeout, worker_max_attempts - Comprehensive README documentation with setup instructions - Examples for monitoring and DLQ handling Testing: - Complete test coverage for all Outbox components - Integration tests for end-to-end event processing - Test database schema with Outbox tables The default behavior remains unchanged (ActiveJobAdapter). Users can opt into database-backed processing by configuring the Outbox::Adapter.
1 parent 2df7338 commit f2f42cd

File tree

19 files changed

+1576
-20
lines changed

19 files changed

+1576
-20
lines changed

README.md

Lines changed: 149 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,11 @@ durable, eventually consistent record that discrete events happened.
2222

2323
## Installation
2424

25-
1. If you haven't already,
26-
[configure ActiveJob](https://guides.rubyonrails.org/active_job_basics.html)
27-
to use one of the following queue adapters:
25+
1. **Configure a queue adapter** (only required if using the default ActiveJob delivery adapter):
26+
27+
If you haven't already,
28+
[configure ActiveJob](https://guides.rubyonrails.org/active_job_basics.html)
29+
to use one of the following queue adapters:
2830

2931
- `:delayed_job` (via `delayed_job_active_record`)
3032
- `:que`
@@ -52,6 +54,8 @@ to use one of the following queue adapters:
5254
5355
This configuration isn't necessary for applications running Rails 8+.
5456

57+
**Note:** If you plan to use the [Database-Backed Event Processing](#database-backed-event-processing-optional) (Outbox adapter), you can skip this step entirely, as the Outbox adapter does not use ActiveJob.
58+
5559
2. To integrate Journaled into your application, simply include the gem in your
5660
app's Gemfile.
5761
@@ -129,6 +133,37 @@ Journaling provides a number of different configuation options that can be set i
129133
130134
The number of seconds before the :http_handler should timeout while waiting for a HTTP response.
131135
136+
#### `Journaled.delivery_adapter` (default: `Journaled::DeliveryAdapters::ActiveJobAdapter`)
137+
138+
Determines how events are delivered to Kinesis. Two options are available:
139+
140+
- **`Journaled::DeliveryAdapters::ActiveJobAdapter`** (default) - Enqueues events to ActiveJob. Requires a DB-backed queue adapter (see Installation).
141+
142+
- **`Journaled::Outbox::Adapter`** - Stores events in a database table and processes them via separate worker daemons. See [Database-Backed Event Processing](#database-backed-event-processing-optional) for setup instructions.
143+
144+
Example:
145+
```ruby
146+
# Use the database-backed Outbox adapter
147+
Journaled.delivery_adapter = Journaled::Outbox::Adapter
148+
```
149+
150+
#### `Journaled.outbox_base_class_name` (default: `'ActiveRecord::Base'`)
151+
152+
**Only relevant when using `Journaled::Outbox::Adapter`.**
153+
154+
Specifies which ActiveRecord base class the Outbox event storage model (`Journaled::Outbox::Event`) should use for its database connection. This is useful for multi-database setups where you want to store events in a separate database.
155+
156+
Example:
157+
```ruby
158+
# Store outbox events in a separate database
159+
class EventsRecord < ActiveRecord::Base
160+
self.abstract_class = true
161+
connects_to database: { writing: :events, reading: :events }
162+
end
163+
164+
Journaled.outbox_base_class_name = 'EventsRecord'
165+
```
166+
132167
#### ActiveJob `set` options
133168
134169
Both model-level directives accept additional options to be passed into ActiveJob's `set` method:
@@ -143,6 +178,117 @@ has_audit_log enqueue_with: { priority: 30 }
143178
# Or for custom journaling:
144179
journal_attributes :email, enqueue_with: { priority: 20, queue: 'journaled' }
145180
```
181+
##### Database-Backed Event Processing (Optional)
182+
183+
Journaled includes a built-in database-backed delivery adapter with horizontally scalable workers. This is useful if you want to:
184+
185+
- **Decouple from Delayed Job/ActiveJob**: Process events independently of your existing job queue
186+
- **Horizontal scaling**: Run multiple worker daemons across different servers/containers
187+
- **Batch API efficiency**: Use Kinesis `put_records` (batch) API instead of `put_record` (single), reducing API calls by up to 500x
188+
- **Custom retry logic**: Fine-tune retry behavior and failed event handling
189+
- **Visibility**: Events are stored in database tables where you can inspect, query, and monitor them
190+
191+
**Setup:**
192+
193+
This feature requires creating database tables and is completely optional. Existing users are unaffected.
194+
195+
1. **Generate migrations:**
196+
197+
```bash
198+
rails generate journaled:database_events
199+
rails db:migrate
200+
```
201+
202+
This creates a table for storing events:
203+
- `journaled_outbox_events` - Queue of events to be processed (includes `failed_at` column for tracking failures)
204+
205+
2. **Configure to use the database adapter:**
206+
207+
```ruby
208+
# config/initializers/journaled.rb
209+
210+
# Use the database-backed Outbox adapter instead of ActiveJob
211+
Journaled.delivery_adapter = Journaled::Outbox::Adapter
212+
213+
# Optional: Customize worker behavior (these are the defaults)
214+
Journaled.worker_batch_size = 500 # Max events per Kinesis batch (Kinesis API limit)
215+
Journaled.worker_poll_interval = 5 # Seconds between polls
216+
Journaled.worker_max_attempts = 3 # Retries before marking as failed
217+
```
218+
219+
**Note:** When using the Outbox adapter, you do **not** need to configure an ActiveJob queue adapter (skip step 1 of Installation). The Outbox adapter uses the `journaled_outbox_events` table for event storage and its own worker daemons for processing, making it independent of ActiveJob. Transactional batching still works seamlessly with the Outbox adapter.
220+
221+
3. **Start worker daemon(s):**
222+
223+
```bash
224+
bundle exec rake journaled_worker:work
225+
```
226+
227+
4. **Monitoring:**
228+
229+
The system emits `ActiveSupport::Notifications` events:
230+
231+
```ruby
232+
# config/initializers/journaled.rb
233+
234+
# Emitted for every batch processed (regardless of outcome)
235+
ActiveSupport::Notifications.subscribe('journaled.worker.batch_process') do |name, start, finish, id, payload|
236+
Statsd.increment('journaled.worker.batches', tags: ["worker:#{payload[:worker_id]}"])
237+
end
238+
239+
# Emitted for successfully sent events
240+
ActiveSupport::Notifications.subscribe('journaled.worker.batch_sent') do |name, start, finish, id, payload|
241+
Statsd.increment('journaled.worker.events_sent', payload[:event_count], tags: ["worker:#{payload[:worker_id]}"])
242+
end
243+
244+
# Emitted for permanently failed events (marked as failed in database)
245+
ActiveSupport::Notifications.subscribe('journaled.worker.batch_failed') do |name, start, finish, id, payload|
246+
Statsd.increment('journaled.worker.events_failed', payload[:event_count], tags: ["worker:#{payload[:worker_id]}"])
247+
end
248+
249+
# Emitted for transiently failed events (will be retried)
250+
ActiveSupport::Notifications.subscribe('journaled.worker.batch_errored') do |name, start, finish, id, payload|
251+
Statsd.increment('journaled.worker.events_errored', payload[:event_count], tags: ["worker:#{payload[:worker_id]}"])
252+
end
253+
```
254+
255+
5. **Failed Events:**
256+
257+
Inspect and requeue failed events:
258+
259+
```ruby
260+
# Find failed events
261+
Journaled::Outbox::Event.failed.where(stream_name: 'my_stream')
262+
Journaled::Outbox::Event.failed_since(1.hour.ago)
263+
264+
# Requeue a failed event (clears failure info and resets attempts)
265+
failed_event = Journaled::Outbox::Event.failed.find(123)
266+
failed_event.requeue!
267+
```
268+
269+
6. **Production Deployment:**
270+
271+
Deploy workers as separate services (systemd, Docker, Kubernetes, etc.):
272+
273+
```yaml
274+
# Example Kubernetes deployment
275+
apiVersion: apps/v1
276+
kind: Deployment
277+
metadata:
278+
name: journaled-worker
279+
spec:
280+
replicas: 3
281+
template:
282+
spec:
283+
containers:
284+
- name: worker
285+
image: myapp:latest
286+
command: ["bundle", "exec", "rake", "journaled_worker:work"]
287+
env:
288+
- name: AWS_DEFAULT_REGION
289+
value: "us-east-1"
290+
# ... other env vars
291+
```
146292

147293
### Attribution
148294

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
# frozen_string_literal: true
2+
3+
module Journaled
4+
module Outbox
5+
# ActiveRecord model for database-backed event processing
6+
#
7+
# This model is only used when the Outbox::Adapter is configured.
8+
# Events are stored in the database and processed by worker daemons.
9+
#
10+
# Successfully delivered events are deleted immediately.
11+
# Failed events are marked with failed_at and can be queried or requeued.
12+
class Event < Journaled.outbox_base_class_name.constantize
13+
self.table_name = 'journaled_outbox_events'
14+
15+
attribute :event_data, :json
16+
17+
validates :event_type, :event_data, :partition_key, :stream_name, presence: true
18+
validates :attempts, numericality: { only_integer: true, greater_than_or_equal_to: 0 }
19+
20+
scope :ready_to_process, -> {
21+
where(failed_at: nil)
22+
.order(:id)
23+
}
24+
25+
scope :failed, -> { where.not(failed_at: nil) }
26+
scope :failed_since, ->(time) { where('failed_at >= ?', time) }
27+
28+
# Fetch a batch of events for processing using SELECT FOR UPDATE SKIP LOCKED
29+
#
30+
# @return [Array<Journaled::Outbox::Event>] Events locked for processing
31+
def self.fetch_batch_for_update
32+
ready_to_process
33+
.limit(Journaled.worker_batch_size)
34+
.lock('FOR UPDATE SKIP LOCKED')
35+
.to_a
36+
end
37+
38+
# Requeue a failed event for processing
39+
#
40+
# Clears failure information and resets attempts so the event can be retried.
41+
#
42+
# @return [Boolean] Whether the requeue was successful
43+
def requeue!
44+
update!(
45+
failed_at: nil,
46+
attempts: 0,
47+
last_error: nil,
48+
)
49+
end
50+
end
51+
end
52+
end
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
# frozen_string_literal: true
2+
3+
require 'rails/generators'
4+
require 'rails/generators/active_record'
5+
6+
module Journaled
7+
module Generators
8+
class DatabaseEventsGenerator < Rails::Generators::Base
9+
include ActiveRecord::Generators::Migration
10+
11+
source_root File.expand_path('templates', __dir__)
12+
13+
desc "Generates migration for journaled database-backed event processing"
14+
15+
def install_uuid_generate_v7_migration
16+
migration_template(
17+
"install_uuid_generate_v7.rb.erb",
18+
"db/migrate/install_uuid_generate_v7.rb",
19+
migration_version:,
20+
)
21+
end
22+
23+
def create_journaled_events_migration
24+
migration_template(
25+
"create_journaled_events.rb.erb",
26+
"db/migrate/create_journaled_events.rb",
27+
migration_version:,
28+
)
29+
end
30+
31+
def show_readme
32+
readme "README" if behavior == :invoke
33+
end
34+
35+
private
36+
37+
def migration_version
38+
"[#{ActiveRecord::VERSION::MAJOR}.#{ActiveRecord::VERSION::MINOR}]"
39+
end
40+
end
41+
end
42+
end
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
===============================================================================
2+
3+
Database-backed event processing migrations have been generated!
4+
5+
Next steps:
6+
7+
1. Run migrations:
8+
9+
rails db:migrate
10+
11+
2. Configure Journaled to use the outbox adapter in an initializer:
12+
13+
# config/initializers/journaled.rb
14+
Journaled.delivery_adapter = Journaled::Outbox::Adapter
15+
16+
# Optional configuration:
17+
Journaled.worker_batch_size = 500 # default: 500 (Kinesis max)
18+
Journaled.worker_poll_interval = 5 # default: 5 seconds
19+
Journaled.worker_max_attempts = 3 # default: 3
20+
21+
3. Start the worker daemon:
22+
23+
bundle exec rake journaled_worker:work
24+
25+
4. (Optional) For horizontal scaling, run multiple workers on different
26+
servers/containers. They will coordinate using database locks.
27+
28+
For more information, see the README:
29+
https://github.com/Betterment/journaled#database-backed-event-processing-optional
30+
31+
===============================================================================
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
class CreateJournaledEvents < ActiveRecord::Migration<%= migration_version %>
2+
def change
3+
# UUID v7 primary key (auto-generated by database using uuid_generate_v7())
4+
create_table :journaled_outbox_events, id: :uuid, default: -> { "uuid_generate_v7()" } do |t|
5+
# Event identification and data
6+
t.string :event_type, null: false
7+
t.jsonb :event_data, null: false
8+
t.string :partition_key, null: false
9+
t.string :stream_name, null: false
10+
11+
# Retry logic
12+
t.integer :attempts, default: 0, null: false
13+
t.text :last_error
14+
15+
# Failure tracking (instead of separate DLQ table)
16+
t.datetime :failed_at
17+
end
18+
19+
# Index for querying failed events
20+
add_index :journaled_outbox_events, :failed_at
21+
end
22+
end
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
class InstallUuidGenerateV7 < ActiveRecord::Migration<%= migration_version %>
2+
def up
3+
# Enable pgcrypto extension for gen_random_bytes()
4+
enable_extension 'pgcrypto'
5+
6+
# Install UUID v7 generation function
7+
# Source: https://github.com/Betterment/postgresql-uuid-generate-v7
8+
execute <<-SQL
9+
CREATE OR REPLACE FUNCTION uuid_generate_v7()
10+
RETURNS uuid
11+
LANGUAGE plpgsql
12+
PARALLEL SAFE
13+
AS $$
14+
DECLARE
15+
unix_time_ms CONSTANT bytea NOT NULL DEFAULT substring(int8send((extract(epoch FROM clock_timestamp()) * 1000)::bigint) from 3);
16+
buffer bytea NOT NULL DEFAULT unix_time_ms || gen_random_bytes(10);
17+
BEGIN
18+
buffer = set_byte(buffer, 6, (b'0111' || get_byte(buffer, 6)::bit(4))::bit(8)::int);
19+
buffer = set_byte(buffer, 8, (b'10' || get_byte(buffer, 8)::bit(6))::bit(8)::int);
20+
RETURN encode(buffer, 'hex');
21+
END
22+
$$;
23+
SQL
24+
end
25+
end

lib/journaled.rb

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,11 @@
1010
require 'journaled/connection'
1111
require 'journaled/delivery_adapter'
1212
require 'journaled/delivery_adapters/active_job_adapter'
13+
require 'journaled/outbox/adapter'
1314
require 'journaled/kinesis_client_factory'
1415
require 'journaled/kinesis_batch_sender'
16+
require 'journaled/outbox/batch_processor'
17+
require 'journaled/outbox/worker'
1518

1619
module Journaled
1720
SUPPORTED_QUEUE_ADAPTERS = %w(delayed delayed_job good_job que).freeze
@@ -22,9 +25,15 @@ module Journaled
2225
mattr_accessor(:http_open_timeout) { 2 }
2326
mattr_accessor(:http_read_timeout) { 60 }
2427
mattr_accessor(:job_base_class_name) { 'ActiveJob::Base' }
28+
mattr_accessor(:outbox_base_class_name) { 'ActiveRecord::Base' }
2529
mattr_accessor(:delivery_adapter) { Journaled::DeliveryAdapters::ActiveJobAdapter }
2630
mattr_writer(:transactional_batching_enabled) { true }
2731

32+
# Worker configuration (for database-backed event processing)
33+
mattr_accessor(:worker_batch_size) { 500 } # Kinesis PutRecords API max
34+
mattr_accessor(:worker_poll_interval) { 5 } # seconds
35+
mattr_accessor(:worker_max_attempts) { 3 }
36+
2837
def self.transactional_batching_enabled?
2938
Thread.current[:journaled_transactional_batching_enabled] || @@transactional_batching_enabled
3039
end

0 commit comments

Comments
 (0)