Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,29 @@ jobs:
test:
runs-on: ubuntu-latest

services:
postgres:
image: postgres:14
env:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: journaled_test
ports:
- 5432:5432
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5

strategy:
fail-fast: false
matrix:
ruby: ['3.2', '3.3', '3.4']
gemfile:
- gemfiles/rails_7_2.gemfile
- gemfiles/rails_8_0.gemfile
database: ['sqlite3', 'postgresql']
steps:
- uses: actions/checkout@f43a0e5ff2bd294095638e18286ca9a3d1956744 # v3
- uses: ruby/setup-ruby@e34163cd15f4bb403dcd72d98e295997e6a55798 # v1
Expand All @@ -21,7 +37,21 @@ jobs:
with:
ruby-version: ${{ matrix.ruby }}
bundler-cache: true # runs 'bundle install' and caches installed gems automatically
- name: Setup database
if: matrix.database == 'postgresql'
env:
DATABASE_ADAPTER: postgresql
DATABASE_USER: postgres
DATABASE_PASSWORD: postgres
DATABASE_HOST: localhost
BUNDLE_GEMFILE: ${{ matrix.gemfile }}
run: |
bundle exec rake db:create db:schema:load
- name: Run tests
env:
BUNDLE_GEMFILE: ${{ matrix.gemfile }}
DATABASE_ADAPTER: ${{ matrix.database }}
DATABASE_USER: postgres
DATABASE_PASSWORD: postgres
DATABASE_HOST: localhost
run: bundle exec rake spec
1 change: 1 addition & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ source 'https://rubygems.org'

gemspec

gem 'pg'
gem 'sqlite3', '>= 2.1'
5 changes: 4 additions & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: .
specs:
journaled (6.1.0)
journaled (6.2.0)
activejob
activerecord
activesupport
Expand Down Expand Up @@ -119,6 +119,8 @@ GEM
parser (3.3.7.0)
ast (~> 2.4.1)
racc
pg (1.6.2-arm64-darwin)
pg (1.6.2-x86_64-linux)
pp (0.6.2)
prettyprint
prettyprint (0.2.0)
Expand Down Expand Up @@ -245,6 +247,7 @@ DEPENDENCIES
appraisal
betterlint
journaled!
pg
rspec-rails
rspec_junit_formatter
spring
Expand Down
137 changes: 134 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ durable, eventually consistent record that discrete events happened.

## Installation

1. If you haven't already,
[configure ActiveJob](https://guides.rubyonrails.org/active_job_basics.html)
to use one of the following queue adapters:
1. **Configure a queue adapter** (only required if using the default ActiveJob delivery adapter):

If you haven't already,
[configure ActiveJob](https://guides.rubyonrails.org/active_job_basics.html)
to use one of the following queue adapters:

- `:delayed_job` (via `delayed_job_active_record`)
- `:que`
Expand Down Expand Up @@ -52,6 +54,8 @@ to use one of the following queue adapters:

This configuration isn't necessary for applications running Rails 8+.

**Note:** If you plan to use the [Outbox-style Event Processing](#outbox-style-event-processing-optional) (Outbox adapter), you can skip this step entirely, as the Outbox adapter does not use ActiveJob.

2. To integrate Journaled into your application, simply include the gem in your
app's Gemfile.

Expand Down Expand Up @@ -129,6 +133,37 @@ Journaling provides a number of different configuation options that can be set i

The number of seconds before the :http_handler should timeout while waiting for a HTTP response.

#### `Journaled.delivery_adapter` (default: `Journaled::DeliveryAdapters::ActiveJobAdapter`)

Determines how events are delivered to Kinesis. Two options are available:

- **`Journaled::DeliveryAdapters::ActiveJobAdapter`** (default) - Enqueues events to ActiveJob. Requires a DB-backed queue adapter (see Installation).

- **`Journaled::Outbox::Adapter`** - Stores events in a database table and processes them via separate worker daemons. See [Outbox-style Event Processing](#outbox-style-event-processing-optional) for setup instructions.

Example:
```ruby
# Use the Outbox-style adapter
Journaled.delivery_adapter = Journaled::Outbox::Adapter
```

#### `Journaled.outbox_base_class_name` (default: `'ActiveRecord::Base'`)

**Only relevant when using `Journaled::Outbox::Adapter`.**

Specifies which ActiveRecord base class the Outbox event storage model (`Journaled::Outbox::Event`) should use for its database connection. This is useful for multi-database setups where you want to store events in a separate database.

Example:
```ruby
# Store outbox events in a separate database
class EventsRecord < ActiveRecord::Base
self.abstract_class = true
connects_to database: { writing: :events, reading: :events }
end

Journaled.outbox_base_class_name = 'EventsRecord'
```

#### ActiveJob `set` options

Both model-level directives accept additional options to be passed into ActiveJob's `set` method:
Expand All @@ -143,6 +178,102 @@ has_audit_log enqueue_with: { priority: 30 }
# Or for custom journaling:
journal_attributes :email, enqueue_with: { priority: 20, queue: 'journaled' }
```
##### Outbox-style Event Processing (Optional)

Journaled includes a built-in Outbox-style delivery adapter with horizontally scalable workers.

**Setup:**

This feature requires creating database tables and is completely optional. Existing users are unaffected.

1. **Install migrations:**

```bash
rake journaled:install:migrations
rails db:migrate
```

This creates a table for storing events:
- `journaled_outbox_events` - Queue of events to be processed (includes `failed_at` column for tracking failures)

2. **Configure to use the database adapter:**

```ruby
# config/initializers/journaled.rb

# Use the Outbox-style adapter instead of ActiveJob
Journaled.delivery_adapter = Journaled::Outbox::Adapter

# Optional: Customize worker behavior (these are the defaults)
Journaled.worker_batch_size = 500 # Max events per Kinesis batch (Kinesis API limit)
Journaled.worker_poll_interval = 5 # Seconds between polls
```

**Note:** When using the Outbox adapter, you do **not** need to configure an ActiveJob queue adapter (skip step 1 of Installation). The Outbox adapter uses the `journaled_outbox_events` table for event storage and its own worker daemons for processing, making it independent of ActiveJob. Transactional batching still works seamlessly with the Outbox adapter.

3. **Start worker daemon(s):**

```bash
bundle exec rake journaled_worker:work
```

4. **Monitoring:**

The system emits `ActiveSupport::Notifications` events:

```ruby
# config/initializers/journaled.rb

# Emitted for every batch processed (regardless of outcome)
ActiveSupport::Notifications.subscribe('journaled.worker.batch_process') do |name, start, finish, id, payload|
Statsd.increment('journaled.worker.batches', tags: ["worker:#{payload[:worker_id]}"])
end

# Emitted for successfully sent events
ActiveSupport::Notifications.subscribe('journaled.worker.batch_sent') do |name, start, finish, id, payload|
Statsd.increment('journaled.worker.events_sent', payload[:event_count], tags: ["worker:#{payload[:worker_id]}"])
end

# Emitted for permanently failed events (marked as failed in database)
ActiveSupport::Notifications.subscribe('journaled.worker.batch_failed') do |name, start, finish, id, payload|
Statsd.increment('journaled.worker.events_failed', payload[:event_count], tags: ["worker:#{payload[:worker_id]}"])
end

# Emitted for transiently failed events (will be retried)
ActiveSupport::Notifications.subscribe('journaled.worker.batch_errored') do |name, start, finish, id, payload|
Statsd.increment('journaled.worker.events_errored', payload[:event_count], tags: ["worker:#{payload[:worker_id]}"])
end

# Emitted once per minute with queue statistics
ActiveSupport::Notifications.subscribe('journaled.worker.queue_metrics') do |name, start, finish, id, payload|
Statsd.gauge('journaled.worker.queue.total', payload[:total_count], tags: ["worker:#{payload[:worker_id]}"])
Statsd.gauge('journaled.worker.queue.workable', payload[:workable_count], tags: ["worker:#{payload[:worker_id]}"])
Statsd.gauge('journaled.worker.queue.erroring', payload[:erroring_count], tags: ["worker:#{payload[:worker_id]}"])
Statsd.gauge('journaled.worker.queue.oldest_age_seconds', payload[:oldest_age_seconds], tags: ["worker:#{payload[:worker_id]}"]) if payload[:oldest_age_seconds]
end
```

Queue metrics payload includes:
- `total_count` - Total number of events in the queue (including failed)
- `workable_count` - Events ready to be processed (not failed)
- `erroring_count` - Events with errors but not yet marked as permanently failed
- `oldest_non_failed_timestamp` - Timestamp of the oldest non-failed event (extracted from UUID v7)
- `oldest_age_seconds` - Age in seconds of the oldest non-failed event

Note: Metrics are collected in a background thread to avoid blocking the main worker loop.

5. **Failed Events:**

Inspect and requeue failed events:

```ruby
# Find failed events
Journaled::Outbox::Event.failed.where(stream_name: 'my_stream')

# Requeue a failed event (clears failure info and resets attempts)
failed_event = Journaled::Outbox::Event.failed.find(123)
failed_event.requeue!
```

### Attribution

Expand Down
49 changes: 1 addition & 48 deletions app/jobs/journaled/delivery_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

module Journaled
class DeliveryJob < ApplicationJob
DEFAULT_REGION = 'us-east-1'

rescue_from(Aws::Kinesis::Errors::InternalFailure, Aws::Kinesis::Errors::ServiceUnavailable, Aws::Kinesis::Errors::Http503Error) do |e|
Rails.logger.error "Kinesis Error - Server Error occurred - #{e.class}"
raise KinesisTemporaryFailure
Expand All @@ -20,16 +18,6 @@ def perform(*events)
journal! if Journaled.enabled?
end

def kinesis_client_config
{
region: ENV.fetch('AWS_DEFAULT_REGION', DEFAULT_REGION),
retry_limit: 0,
http_idle_timeout: Journaled.http_idle_timeout,
http_open_timeout: Journaled.http_open_timeout,
http_read_timeout: Journaled.http_read_timeout,
}.merge(credentials)
end

private

KinesisRecord = Struct.new(:serialized_event, :partition_key, :stream_name, keyword_init: true) do
Expand All @@ -51,42 +39,7 @@ def journal!
end

def kinesis_client
Aws::Kinesis::Client.new(kinesis_client_config)
end

def credentials
if ENV.key?('JOURNALED_IAM_ROLE_ARN')
{
credentials: iam_assume_role_credentials,
}
else
legacy_credentials_hash_if_present
end
end

def legacy_credentials_hash_if_present
if ENV.key?('RUBY_AWS_ACCESS_KEY_ID')
{
access_key_id: ENV.fetch('RUBY_AWS_ACCESS_KEY_ID'),
secret_access_key: ENV.fetch('RUBY_AWS_SECRET_ACCESS_KEY'),
}
else
{}
end
end

def sts_client
Aws::STS::Client.new({
region: ENV.fetch('AWS_DEFAULT_REGION', DEFAULT_REGION),
}.merge(legacy_credentials_hash_if_present))
end

def iam_assume_role_credentials
@iam_assume_role_credentials ||= Aws::AssumeRoleCredentials.new(
client: sts_client,
role_arn: ENV.fetch('JOURNALED_IAM_ROLE_ARN'),
role_session_name: "JournaledAssumeRoleAccess",
)
@kinesis_client ||= KinesisClientFactory.build
end

class KinesisTemporaryFailure < NotTrulyExceptionalError
Expand Down
60 changes: 60 additions & 0 deletions app/models/journaled/outbox/event.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# frozen_string_literal: true

module Journaled
module Outbox
# ActiveRecord model for Outbox-style event processing
#
# This model is only used when the Outbox::Adapter is configured.
# Events are stored in the database and processed by worker daemons.
#
# Successfully delivered events are deleted immediately.
# Failed events are marked with failed_at and can be queried or requeued.
class Event < Journaled.outbox_base_class_name.constantize
self.table_name = 'journaled_outbox_events'

self.record_timestamps = false # use db default

skip_audit_log

attribute :event_data, :json

validates :event_type, :event_data, :partition_key, :stream_name, presence: true

scope :ready_to_process, -> {
where(failed_at: nil)
.order(:id)
}

scope :failed, -> { where.not(failed_at: nil) }

# Fetch a batch of events for processing using SELECT FOR UPDATE
#
# @return [Array<Journaled::Outbox::Event>] Events locked for processing
def self.fetch_batch_for_update
ready_to_process
.limit(Journaled.worker_batch_size)
.lock
.to_a
end

# Requeue a failed event for processing
#
# Clears failure information so the event can be retried.
#
# @return [Boolean] Whether the requeue was successful
def requeue!
update!(
failed_at: nil,
failure_reason: nil,
)
end

# Get the oldest non-failed event's timestamp
#
# @return [Time, nil] The timestamp of the oldest event, or nil if no events exist
def self.oldest_non_failed_timestamp
ready_to_process.order(:id).limit(1).pick(:created_at)
end
end
end
end
Loading