Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
# Changelog
All notable changes to this project will be documented in this file.

## [1.7.0] - 2025-08-19
### Added
- Ability to specify a custom job class for publishing via `publishing_job_class_callable` config.
- Ability to specify a default queue for publishing jobs via `default_publishing_job_queue` config.
- Ability to specify a custom queue per publish call via `custom_queue_name` argument.

## [1.5.0] - 2025-05-19
### Added
- Added ability to split log message into parts
Expand Down
2 changes: 1 addition & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ GIT
PATH
remote: .
specs:
rabbit_messaging (1.6.3)
rabbit_messaging (1.7.0)
bunny (~> 2.0)
kicks

Expand Down
27 changes: 19 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,14 @@ require "rabbit_messaging"
}
```

- `publishing_job_class_callable` (`Proc`)

Custom job class (e.g. ActiveJob or Sidekiq::Job) to work with published messages.

- `default_publishing_job_queue` (`String` or `Symbol`)

The name of the queue that will be used by default for publishing jobs. `default` by default.

- `before_receiving_hooks, after_receiving_hooks` (`Array of Procs`)

Before and after hooks with message processing in the middle. Where `before_receiving_hooks` and `after_receiving_hooks` are empty arrays by default.
Expand Down Expand Up @@ -139,7 +147,7 @@ require "rabbit_messaging"
- `connection_reset_max_retries` (`Integer`)

Maximum number of reconnection attempts after a connection loss. Default: 10.

```ruby
config.connection_reset_max_retries = 20
```
Expand All @@ -165,13 +173,16 @@ require "rabbit_messaging"

```ruby
Rabbit.publish(
routing_key: :support,
event: :ping,
data: { foo: :bar }, # default is {}
exchange_name: 'fanout', # default is fine too
confirm_select: true, # setting this to false grants you great speed up and absolutelly no guarantees
headers: { "foo" => "bar" }, # custom arguments for routing, default is {}
message_id: "asdadsadsad", # A unique identifier such as a UUID that your application can use to identify the message.
{
routing_key: :support,
event: :ping,
data: { foo: :bar }, # default is {}
exchange_name: 'fanout', # default is fine too
confirm_select: true, # setting this to false grants you great speed up and absolutelly no guarantees
headers: { "foo" => "bar" }, # custom arguments for routing, default is {}
message_id: "asdadsadsad", # A unique identifier such as a UUID that your application can use to identify the message.
},
custom_queue_name: :my_custom_queue, # The name of the queue for publishing jobs. Overrides the default queue.
)
```

Expand Down
36 changes: 32 additions & 4 deletions lib/rabbit.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ class Config
:hooks,
:queue_name_conversion,
:receiving_job_class_callable,
:publishing_job_class_callable,
:default_publishing_job_queue,
:handler_resolver_callable,
:exception_notifier,
:before_receiving_hooks,
Expand All @@ -41,6 +43,8 @@ def initialize( # rubocop:disable Metrics/MethodLength
environment: :production,
queue_name_conversion: nil,
receiving_job_class_callable: nil,
publishing_job_class_callable: nil,
default_publishing_job_queue: :default,
handler_resolver_callable: nil,
exception_notifier: nil,
before_receiving_hooks: [],
Expand All @@ -63,6 +67,8 @@ def initialize( # rubocop:disable Metrics/MethodLength
self.environment = environment.to_sym
self.queue_name_conversion = queue_name_conversion
self.receiving_job_class_callable = receiving_job_class_callable
self.publishing_job_class_callable = publishing_job_class_callable
self.default_publishing_job_queue = default_publishing_job_queue
self.handler_resolver_callable = handler_resolver_callable
self.exception_notifier = exception_notifier
self.before_receiving_hooks = before_receiving_hooks
Expand Down Expand Up @@ -163,13 +169,35 @@ def configure
config.validate!
end

def publish(message_options)
message = Publishing::Message.new(message_options)
def publish(
routing_key: nil,
event: nil,
data: {},
exchange_name: [],
confirm_select: true,
realtime: false,
headers: {},
message_id: nil,
custom_queue_name: nil
)
message = Publishing::Message.new(
routing_key: routing_key,
event: event,
data: data,
exchange_name: exchange_name,
confirm_select: confirm_select,
realtime: realtime,
headers: headers,
message_id: message_id,
)
job_class = config.publishing_job_class_callable
publish_job_callable = job_class.is_a?(Proc) ? job_class.call : (job_class || Publishing::Job)
queue_name = custom_queue_name || default_queue_name

if message.realtime?
Publishing.publish(message)
else
Publishing::Job.set(queue: default_queue_name).perform_later(message.to_hash)
publish_job_callable.set(queue: queue_name).perform_later(message.to_hash)
end
end

Expand All @@ -179,6 +207,6 @@ def queue_name(queue, ignore_conversion: false)
end

def default_queue_name(ignore_conversion: false)
queue_name(:default, ignore_conversion: ignore_conversion)
queue_name(config.default_publishing_job_queue, ignore_conversion: ignore_conversion)
end
end
2 changes: 1 addition & 1 deletion lib/rabbit/publishing/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
module Rabbit::Publishing
class Job < ActiveJob::Base
def perform(message)
Rabbit::Publishing.publish(Message.new(message))
Rabbit::Publishing.publish(Message.new(**message))
end
end
end
27 changes: 18 additions & 9 deletions lib/rabbit/publishing/message.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,24 @@ class Message
alias_method :confirm_select?, :confirm_select
alias_method :realtime?, :realtime

def initialize(attributes = {})
self.routing_key = attributes[:routing_key]
self.event = attributes[:event]&.to_s
self.data = attributes.fetch(:data, {})
self.exchange_name = Array(attributes.fetch(:exchange_name, []))
self.confirm_select = attributes.fetch(:confirm_select, true)
self.realtime = attributes.fetch(:realtime, false)
self.headers = attributes.fetch(:headers, {})
self.message_id = attributes[:message_id]
def initialize(
routing_key: nil,
event: nil,
data: {},
exchange_name: [],
confirm_select: true,
realtime: false,
headers: {},
message_id: nil
)
self.routing_key = routing_key
self.event = event&.to_s
self.data = data
self.exchange_name = Array(exchange_name)
self.confirm_select = confirm_select
self.realtime = realtime
self.headers = headers
self.message_id = message_id
end

def to_hash
Expand Down
2 changes: 1 addition & 1 deletion lib/rabbit/version.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# frozen_string_literal: true

module Rabbit
VERSION = "1.6.3"
VERSION = "1.7.0"
end
2 changes: 1 addition & 1 deletion spec/units/rabbit/publishing/message_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

describe Rabbit::Publishing::Message do
describe "basic_publish_args" do
subject(:message) { described_class.new(attributes) }
subject(:message) { described_class.new(**attributes) }

context "rounting key not specified" do
let(:attributes) { Hash[event: :ping] }
Expand Down
65 changes: 59 additions & 6 deletions spec/units/rabbit_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
message_id: "uuid",
}
end
let(:additional_params) { {} }

before do
Rabbit.config.queue_name_conversion = -> (queue) { "#{queue}_prepared" }
Expand Down Expand Up @@ -52,8 +53,8 @@

it "publishes" do
if expect_to_use_job
set_params = { queue: "default_prepared" }
expect(Rabbit::Publishing::Job).to receive(:set).with(set_params).and_call_original
set_params = { queue: expected_queue }
expect(job_class).to receive(:set).with(set_params).and_call_original
perform_params = {
routing_key: "some_queue",
event: "some_event",
Expand All @@ -68,7 +69,7 @@
.to receive(:perform_later).with(perform_params).and_call_original

else
expect(Rabbit::Publishing::Job).not_to receive(:perform_later)
expect(job_class).not_to receive(:perform_later)
end

expect(publish_logger).to receive(:debug).with(<<~MSG.strip)
Expand All @@ -79,7 +80,7 @@
test_group_id.test_project_id.some_exchange / some_queue / {"foo":"bar"} / some_event / \
confirm: ...world"}
MSG
described_class.publish(message_options)
described_class.publish(**message_options, **additional_params)
end

after do
Expand All @@ -96,6 +97,7 @@
let(:publish_logger) { double("publish_logger") }
let(:bunny) { double("bunny") }
let(:channel) { double("channel") }
let(:job_class) { Rabbit::Publishing::Job }

before do
allow(Bunny).to receive_message_chain(:new, :start).and_return(bunny)
Expand Down Expand Up @@ -132,27 +134,78 @@
confirm: {"hello":"world"}
MSG

expect { described_class.publish(message_options) }.not_to raise_error
expect { described_class.publish(**message_options) }.not_to raise_error
end

it "raises the last exception after max retries" do
allow(channel).to receive(:basic_publish).and_raise(Bunny::ConnectionClosedError.new(""))

expect { described_class.publish(message_options) }
expect { described_class.publish(**message_options) }
.to raise_error(Bunny::ConnectionClosedError)
end
end

context "realtime" do
let(:realtime) { true }
let(:expect_to_use_job) { false }
let(:expected_queue) { "default_prepared" }
let(:job_class) { Rabbit::Publishing::Job }

include_examples "publishes"
end

context "not realtime" do
let(:realtime) { false }
let(:expect_to_use_job) { true }
let(:expected_queue) { "default_prepared" }
let(:job_class) { Rabbit::Publishing::Job }

include_examples "publishes"
end

context "with custom job class" do
let(:realtime) { false }
let(:expect_to_use_job) { true }
let(:expected_queue) { "default_prepared" }
let(:job_class) { Class.new(Rabbit::Publishing::Job) }

before do
stub_const("CustomJobClass", job_class)
allow(Rabbit.config).to receive(:publishing_job_class_callable).and_return(job_class)
end

include_examples "publishes"
end

context "with custom default_publishing_job_queue" do
let(:realtime) { false }
let(:expect_to_use_job) { true }
let(:job_class) { Rabbit::Publishing::Job }
let(:default_publishing_job_queue) { :custom_queue }
let(:expected_queue) { "passed_to_method_queue" }
let(:additional_params) { { custom_queue_name: "passed_to_method_queue" } }

before do
allow(Rabbit.config).to(
receive(:default_publishing_job_queue).and_return(default_publishing_job_queue),
)
end

include_examples "publishes"
end

context "with custom queue name" do
let(:realtime) { false }
let(:expect_to_use_job) { true }
let(:job_class) { Rabbit::Publishing::Job }
let(:default_publishing_job_queue) { :custom_queue }
let(:expected_queue) { "custom_queue_prepared" }

before do
allow(Rabbit.config).to(
receive(:default_publishing_job_queue).and_return(default_publishing_job_queue),
)
end

include_examples "publishes"
end
Expand Down