Skip to content

Commit 8595767

Browse files
Support subscriber filtering
1 parent 387e6bf commit 8595767

File tree

2 files changed

+48
-4
lines changed

2 files changed

+48
-4
lines changed

activesupport/lib/active_support/event_reporter.rb

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,16 @@ def clear
8282
# # source_location: { filepath: "path/to/file.rb", lineno: 123, label: "UserService#create" }
8383
# # }
8484
#
85+
# ==== Filtered Subscriptions
86+
#
87+
# Subscribers can be configured with an optional filter proc to only receive a subset of events:
88+
#
89+
# # Only receive events with names starting with "user."
90+
# Rails.event.subscribe(user_subscriber) { |event| event[:name].start_with?("user.") }
91+
#
92+
# # Only receive events with specific payload types
93+
# Rails.event.subscribe(audit_subscriber) { |event| event[:payload].is_a?(AuditEvent) }
94+
#
8595
# The +notify+ API can receive either an event name and a payload hash, or an event object. Names are coerced to strings.
8696
#
8797
# ==== Event Objects
@@ -271,12 +281,17 @@ def initialize(*subscribers, raise_on_error: false, tags: nil)
271281
# timestamp: Float (The timestamp of the event, in nanoseconds)
272282
# source_location: Hash (The source location of the event, containing the filepath, lineno, and label)
273283
#
274-
def subscribe(subscriber)
284+
# An optional filter proc can be provided to only receive a subset of events:
285+
#
286+
# Rails.event.subscribe(subscriber) { |event| event[:name].start_with?("user.") }
287+
# Rails.event.subscribe(subscriber) { |event| event[:payload].is_a?(UserEvent) }
288+
#
289+
def subscribe(subscriber, &filter)
275290
unless subscriber.respond_to?(:emit)
276291
raise ArgumentError, "Event subscriber #{subscriber.class.name} must respond to #emit"
277292
end
278293

279-
@subscribers << subscriber
294+
@subscribers << { subscriber: subscriber, filter: filter }
280295
end
281296

282297
# Reports an event to all registered subscribers. An event name and payload can be provided:
@@ -333,7 +348,12 @@ def notify(name_or_object, payload = nil, caller_depth: 1, **kwargs)
333348
event[:source_location] = source_location
334349
end
335350

336-
subscribers.each do |subscriber|
351+
subscribers.each do |subscriber_entry|
352+
subscriber = subscriber_entry[:subscriber]
353+
filter = subscriber_entry[:filter]
354+
355+
next if filter && !filter.call(event)
356+
337357
subscriber.emit(event)
338358
rescue => subscriber_error
339359
if raise_on_error

activesupport/test/event_reporter_test.rb

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,16 @@ def initialize(http_method, http_status)
4242
test "#subscribe" do
4343
reporter = ActiveSupport::EventReporter.new
4444
reporter.subscribe(@subscriber)
45-
assert_equal([@subscriber], reporter.subscribers)
45+
assert_equal([{ subscriber: @subscriber, filter: nil }], reporter.subscribers)
46+
end
47+
48+
test "#subscribe with filter" do
49+
reporter = ActiveSupport::EventReporter.new
50+
51+
filter = ->(event) { event[:name].start_with?("user.") }
52+
reporter.subscribe(@subscriber, &filter)
53+
54+
assert_equal([{ subscriber: @subscriber, filter: filter }], reporter.subscribers)
4655
end
4756

4857
test "#subscribe raises ArgumentError when sink doesn't respond to emit" do
@@ -63,6 +72,21 @@ def initialize(http_method, http_status)
6372
end
6473
end
6574

75+
test "#notify filters" do
76+
reporter = ActiveSupport::EventReporter.new
77+
reporter.subscribe(@subscriber) { |event| event[:name].start_with?("user_") }
78+
79+
assert_not_called(@subscriber, :emit) do
80+
reporter.notify(:test_event)
81+
end
82+
83+
assert_called_with(@subscriber, :emit, [
84+
event_matcher(name: "user_event")
85+
]) do
86+
reporter.notify(:user_event)
87+
end
88+
end
89+
6690
test "#notify with name and hash payload" do
6791
assert_called_with(@subscriber, :emit, [
6892
event_matcher(name: "test_event", payload: { key: "value" })

0 commit comments

Comments
 (0)