Skip to content

Commit 23a1384

Browse files
committed
New task OnChannelTask
1 parent de221d0 commit 23a1384

File tree

2 files changed

+82
-27
lines changed

2 files changed

+82
-27
lines changed

spec/marmot_spec.cr

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,35 @@ describe Marmot do
9797
end
9898
end
9999

100+
describe "#on" do
101+
it "schedules a new task that runs on messages on a channel" do
102+
to_task = Channel(Int32).new
103+
from_task = Channel(Int32).new
104+
105+
task = Marmot.on(to_task) do |t|
106+
t = t.as(Marmot::OnChannelTask)
107+
from_task.send(t.value.not_nil! * 2)
108+
next
109+
end
110+
spawn Marmot.run
111+
112+
expect_channel_none(from_task)
113+
114+
to_task.send(1)
115+
Fiber.yield
116+
expect_channel_eq(from_task, 2)
117+
118+
to_task.send(42)
119+
Fiber.yield
120+
expect_channel_eq(from_task, 84)
121+
122+
task.cancel
123+
to_task.send(1)
124+
Fiber.yield
125+
expect_channel_none(from_task)
126+
end
127+
end
128+
100129
describe "#run" do
101130
it "runs a task without arguments" do
102131
channel = Channel(Int32).new

src/marmot.cr

Lines changed: 53 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ module Marmot
1515
getter tick = Channel(Task).new
1616

1717
# :nodoc:
18-
abstract def start : Nil
18+
abstract def wait_next_tick : Nil
1919

2020
# Cancels the task.
2121
#
@@ -34,20 +34,12 @@ module Marmot
3434
@callback.call(self)
3535
rescue
3636
end
37-
end
38-
39-
class Repeat < Task
40-
def initialize(@span : Time::Span, @first_run : Bool, @callback : Callback)
41-
end
4237

38+
# :nodoc:
4339
def start : Nil
4440
spawn do
4541
while !canceled?
46-
if @first_run
47-
@first_run = false
48-
else
49-
sleep @span
50-
end
42+
wait_next_tick
5143

5244
# The task could have been canceled while we were sleeping.
5345
if !canceled?
@@ -60,17 +52,12 @@ module Marmot
6052
end
6153
end
6254

63-
class Cron < Task
55+
class CronTask < Task
6456
def initialize(@hour : Int32, @minute : Int32, @second : Int32, @callback : Callback)
6557
end
6658

67-
def start : Nil
68-
spawn do
69-
while !@canceled
70-
sleep span
71-
@tick.send(self)
72-
end
73-
end
59+
def wait_next_tick : Nil
60+
sleep span
7461
end
7562

7663
private def span
@@ -99,21 +86,60 @@ module Marmot
9986
end
10087
end
10188

89+
class OnChannelTask(T) < Task
90+
getter value : T? = nil
91+
92+
def initialize(@channel : Channel(T), @callback : Callback)
93+
end
94+
95+
def wait_next_tick : Nil
96+
@value = @channel.receive?
97+
end
98+
end
99+
100+
class RepeatTask < Task
101+
def initialize(@span : Time::Span, @first_run : Bool, @callback : Callback)
102+
end
103+
104+
def wait_next_tick : Nil
105+
if @first_run
106+
@first_run = false
107+
else
108+
sleep @span
109+
end
110+
end
111+
end
112+
102113
extend self
103114

104-
# Runs a task every given *span*.
115+
# Runs a task every day at *hour* and *minute*.
116+
def cron(hour, minute, second = 0, &block : Callback) : Task
117+
task = CronTask.new(hour, minute, second, block)
118+
@@tasks << task
119+
task
120+
end
121+
122+
# Runs a task when a value is received on a channel.
105123
#
106-
# If first run is true, it will run as soon as the scheduler runs.
107-
# Else it will wait *span* time, then run a first time.
108-
def repeat(span : Time::Span, first_run = false, &block : Callback) : Task
109-
task = Repeat.new(span, first_run, block)
124+
# To access the value, you need to restrict the type of the task, and use
125+
# `OnChannelTask#value`.
126+
#
127+
# ```
128+
# channel = Channel(Int32).new
129+
# Marmot.on(channel) { |task| puts task.as(OnChannelTask).value }
130+
# ```
131+
def on(channel, &block : Callback) : Task
132+
task = OnChannelTask.new(channel, block)
110133
@@tasks << task
111134
task
112135
end
113136

114-
# Runs a task every day at *hour* and *minute*.
115-
def cron(hour, minute, second = 0, &block : Callback) : Task
116-
task = Cron.new(hour, minute, second, block)
137+
# Runs a task every given *span*.
138+
#
139+
# If first run is true, it will run as soon as the scheduler runs.
140+
# Else it will wait *span* time, then run a first time.
141+
def repeat(span : Time::Span, first_run = false, &block : Callback) : Task
142+
task = RepeatTask.new(span, first_run, block)
117143
@@tasks << task
118144
task
119145
end

0 commit comments

Comments
 (0)