Skip to content

Commit 24f0db6

Browse files
100% test coverage.
1 parent e711902 commit 24f0db6

File tree

10 files changed

+470
-67
lines changed

10 files changed

+470
-67
lines changed

gems.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
gem "rubocop"
2525

2626
gem "sus-fixtures-async"
27+
gem "sus-fixtures-console"
2728

2829
gem "bake-test"
2930
gem "bake-test-external"

lib/async/job/processor/aggregate.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ def flush(jobs)
3434
@delegate.call(job)
3535
end
3636
rescue => error
37-
Console::Event::Failure.for(error).emit(self, "Could not flush #{jobs.size} jobs.")
37+
Console.error(self, "Could not flush #{jobs.size} jobs.", exception: error)
3838
end
3939

4040
# Run the background processing loop that continuously processes job batches.

lib/async/job/processor/generic.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ module Processor
1010
# This processor acts as a simple wrapper around any object that responds to call, start, and stop methods.
1111
class Generic
1212
# Initialize a new generic processor.
13-
# @parameter delegate [Object | nil] The delegate object that will handle job execution.
14-
def initialize(delegate = nil)
13+
# @parameter delegate [Object] The delegate object that will handle job execution.
14+
def initialize(delegate)
1515
@delegate = delegate
1616
end
1717

lib/async/job/processor/inline.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,12 @@ def call(job)
4343

4444
# Start the processor by delegating to the configured delegate.
4545
def start
46-
@delegate&.start
46+
@delegate.start
4747
end
4848

4949
# Stop the processor by delegating to the configured delegate.
5050
def stop
51-
@delegate&.stop
51+
@delegate.stop
5252
end
5353
end
5454
end

test/async/job/buffer.rb

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
# frozen_string_literal: true
2+
3+
# Released under the MIT License.
4+
# Copyright, 2024, by Samuel Williams.
5+
6+
require "async/job/buffer"
7+
require "sus/fixtures/async/reactor_context"
8+
9+
describe Async::Job::Buffer do
10+
include Sus::Fixtures::Async::ReactorContext
11+
12+
let(:buffer) {subject.new}
13+
14+
with "no delegate" do
15+
it "can be initialized without a delegate" do
16+
expect(buffer).to be_a(Async::Job::Buffer)
17+
end
18+
19+
it "starts with an empty queue" do
20+
expect(buffer).to be(:empty?)
21+
end
22+
23+
it "can add jobs to the buffer" do
24+
job = {"data" => "test job"}
25+
buffer.call(job)
26+
27+
expect(buffer).not.to be(:empty?)
28+
expect(buffer.pop).to be == job
29+
end
30+
31+
it "can add multiple jobs" do
32+
job1 = {"id" => 1}
33+
job2 = {"id" => 2}
34+
35+
buffer.call(job1)
36+
buffer.call(job2)
37+
38+
expect(buffer.pop).to be == job1
39+
expect(buffer.pop).to be == job2
40+
expect(buffer).to be(:empty?)
41+
end
42+
43+
it "can start without a delegate" do
44+
expect{buffer.start}.not.to raise_exception
45+
end
46+
47+
it "can stop without a delegate" do
48+
expect{buffer.stop}.not.to raise_exception
49+
end
50+
end
51+
52+
with "delegate" do
53+
let(:delegate) do
54+
Class.new do
55+
def initialize
56+
@jobs = []
57+
@started = false
58+
@stopped = false
59+
end
60+
61+
attr_reader :jobs, :started, :stopped
62+
63+
def call(job)
64+
@jobs << job
65+
end
66+
67+
def start
68+
@started = true
69+
end
70+
71+
def stop
72+
@stopped = true
73+
end
74+
end.new
75+
end
76+
77+
let(:buffer) {subject.new(delegate)}
78+
79+
it "can be initialized with a delegate" do
80+
expect(buffer).to be_a(Async::Job::Buffer)
81+
end
82+
83+
it "delegates job calls to the delegate" do
84+
job = {"data" => "test job"}
85+
buffer.call(job)
86+
87+
expect(delegate.jobs).to have_value(be == job)
88+
end
89+
90+
it "starts the delegate when start is called" do
91+
buffer.start
92+
expect(delegate.started).to be == true
93+
end
94+
95+
it "stops the delegate when stop is called" do
96+
buffer.stop
97+
expect(delegate.stopped).to be == true
98+
end
99+
100+
it "still maintains its own queue" do
101+
job = {"data" => "test job"}
102+
buffer.call(job)
103+
104+
expect(buffer.pop).to be == job
105+
end
106+
end
107+
end

test/async/job/builder.rb

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,75 @@
2525

2626
expect(buffer.pop).to be == "My job"
2727
end
28+
29+
it "can add dequeue middleware" do
30+
buffer = Async::Job::Buffer.new
31+
32+
# Create a pipeline with dequeue middleware:
33+
pipeline = Async::Job::Builder.build(buffer) do
34+
dequeue Async::Job::Processor::Inline
35+
end
36+
37+
# The dequeue middleware should be applied to the server side
38+
expect(pipeline.server).to be_a(Async::Job::Processor::Inline)
39+
expect(pipeline.client).to be_a(Async::Job::Processor::Inline)
40+
expect(pipeline.delegate).to be == buffer
41+
end
42+
43+
it "can set delegate using delegate method" do
44+
custom_delegate = Object.new
45+
46+
# Create a pipeline with custom delegate:
47+
pipeline = Async::Job::Builder.build do
48+
delegate custom_delegate
49+
end
50+
51+
expect(pipeline.delegate).to be == custom_delegate
52+
expect(pipeline.client).to be == custom_delegate
53+
expect(pipeline.server).to be == custom_delegate
54+
end
55+
56+
it "can build with both enqueue and dequeue middleware" do
57+
buffer = Async::Job::Buffer.new
58+
59+
# Create a pipeline with both enqueue and dequeue middleware:
60+
pipeline = Async::Job::Builder.build(buffer) do
61+
enqueue Async::Job::Processor::Inline
62+
dequeue Async::Job::Processor::Inline
63+
end
64+
65+
# Both client and server should be wrapped with Inline processors
66+
expect(pipeline.client).to be_a(Async::Job::Processor::Inline)
67+
expect(pipeline.server).to be_a(Async::Job::Processor::Inline)
68+
expect(pipeline.delegate).to be == buffer
69+
end
70+
71+
it "can build with custom delegate parameter" do
72+
custom_delegate = Object.new
73+
74+
# Create a pipeline with custom delegate passed to build:
75+
pipeline = Async::Job::Builder.build(custom_delegate) do
76+
enqueue Async::Job::Processor::Inline
77+
end
78+
79+
expect(pipeline.delegate).to be == custom_delegate
80+
expect(pipeline.client).to be_a(Async::Job::Processor::Inline)
81+
expect(pipeline.server).to be == custom_delegate
82+
end
83+
84+
it "can chain middleware methods" do
85+
buffer = Async::Job::Buffer.new
86+
87+
# Test method chaining:
88+
builder = Async::Job::Builder.new(buffer)
89+
result = builder.enqueue(Async::Job::Processor::Inline)
90+
.dequeue(Async::Job::Processor::Inline)
91+
.delegate(buffer)
92+
93+
expect(result).to be == builder
94+
95+
# Build the pipeline:
96+
pipeline = builder.build
97+
expect(pipeline.delegate).to be == buffer
98+
end
2899
end

test/async/job/processor/aggregate.rb

Lines changed: 70 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -3,63 +3,84 @@
33
# Released under the MIT License.
44
# Copyright, 2024, by Samuel Williams.
55

6-
require "async"
7-
6+
require "async/job/processor/aggregate"
87
require "sus/fixtures/async/reactor_context"
8+
require "sus/fixtures/console/captured_logger"
99

10-
require "async/job/buffer"
11-
require "async/job/processor/aggregate"
10+
describe Async::Job::Processor::Aggregate do
11+
include Sus::Fixtures::Async::ReactorContext
12+
include_context Sus::Fixtures::Console::CapturedLogger
1213

13-
class SlowQueue
14-
def initialize(delegate = nil)
15-
@condition = Async::Condition.new
16-
@delegate = delegate
14+
let(:delegate) do
15+
Class.new do
16+
attr_reader :called_jobs, :started, :stopped
17+
def initialize
18+
@called_jobs = []
19+
@started = false
20+
@stopped = false
21+
end
22+
def call(job)
23+
@called_jobs << job
24+
end
25+
def start
26+
@started = true
27+
"started"
28+
end
29+
def stop
30+
@stopped = true
31+
"stopped"
32+
end
33+
end.new
1734
end
18-
19-
attr :condition
20-
21-
def call(job)
22-
@condition.wait
23-
@delegate&.call(job)
35+
36+
let(:processor) {subject.new(delegate)}
37+
38+
it "processes jobs in batches" do
39+
processor.call(:job1)
40+
processor.call(:job2)
41+
42+
# Give the processor a chance to process jobs
43+
sleep(0.1)
44+
45+
expect(delegate.called_jobs).to be(:include?, :job1)
46+
expect(delegate.called_jobs).to be(:include?, :job2)
2447
end
25-
26-
def start
27-
@delegate&.start
48+
49+
it "handles errors in flush" do
50+
error_delegate = Class.new do
51+
def call(job)
52+
raise "Test error"
53+
end
54+
end.new
55+
56+
error_processor = subject.new(error_delegate)
57+
error_processor.call(:job1)
58+
# Give the processor a chance to process jobs and handle error
59+
sleep(0.1)
60+
61+
# Assert that the error was logged
62+
expect_console.to have_logged(
63+
severity: be == :error,
64+
message: be(:include?, "Could not flush")
65+
)
2866
end
29-
30-
def stop
31-
@delegate&.stop
67+
68+
it "delegates start to super and start!" do
69+
# This will call super (Generic#start) and then start!
70+
# We can't easily check super, but we can check that start! returns true
71+
result = processor.send(:start!)
72+
expect(result).to be == true
3273
end
33-
end
3474

35-
describe Async::Job::Processor::Aggregate do
36-
include Sus::Fixtures::Async::ReactorContext
37-
38-
let(:buffer) {Async::Job::Buffer.new}
39-
let(:server) {subject.new(buffer)}
40-
41-
let(:job) {{"data" => "test job"}}
42-
43-
it "can schedule a job" do
44-
server.call(job)
45-
46-
expect(buffer.pop).to be == job
75+
it "delegates start to delegate" do
76+
result = processor.start
77+
expect(delegate.started).to be == true
78+
expect(result).to be == true
4779
end
48-
49-
with "slow queue" do
50-
let(:queue) {SlowQueue.new(buffer)}
51-
let(:server) {subject.new(queue)}
52-
53-
it "flushes jobs on shutdown" do
54-
server.call(job)
55-
server.stop
56-
57-
expect(buffer).to be(:empty?)
58-
59-
# Allow job processing to continue:
60-
queue.condition.signal
61-
62-
expect(buffer.pop).to be == job
63-
end
80+
81+
it "delegates stop to delegate" do
82+
result = processor.stop
83+
expect(delegate.stopped).to be == true
84+
expect(result).to be == "stopped"
6485
end
6586
end

0 commit comments

Comments
 (0)