Skip to content

Commit 106dcb2

Browse files
committed
Move promises to core
1 parent 85ddb7c commit 106dcb2

File tree

4 files changed

+2
-152
lines changed

4 files changed

+2
-152
lines changed

lib/concurrent-edge.rb

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,7 @@
88

99
require 'concurrent/edge/lock_free_linked_set'
1010
require 'concurrent/edge/lock_free_queue'
11-
require 'concurrent/edge/lock_free_stack'
1211

13-
require 'concurrent/edge/promises'
1412
require 'concurrent/edge/cancellation'
1513
require 'concurrent/edge/throttle'
1614

lib/concurrent.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
require 'concurrent/settable_struct'
3232
require 'concurrent/timer_task'
3333
require 'concurrent/tvar'
34+
require 'concurrent/promises'
3435

3536
require 'concurrent/thread_safe/synchronized_delegator'
3637
require 'concurrent/thread_safe/util'

lib/concurrent/edge/promises.rb renamed to lib/concurrent/promises.rb

Lines changed: 1 addition & 150 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
11
require 'concurrent/synchronization'
22
require 'concurrent/atomic/atomic_boolean'
33
require 'concurrent/atomic/atomic_fixnum'
4-
require 'concurrent/edge/lock_free_stack'
4+
require 'concurrent/collection/lock_free_stack'
55
require 'concurrent/errors'
66

77
module Concurrent
88

9-
109
# {include:file:doc/promises-main.md}
1110
module Promises
1211

@@ -1961,151 +1960,3 @@ def initialize(default_executor, intended_time)
19611960

19621961
end
19631962
end
1964-
1965-
# TODO try stealing pool, each thread has it's own queue
1966-
# TODO (pitr-ch 18-Dec-2016): doc macro debug method
1967-
# TODO (pitr-ch 18-Dec-2016): add macro noting that debug methods may change api without warning
1968-
1969-
1970-
module Concurrent
1971-
module Promises
1972-
1973-
class Future < AbstractEventFuture
1974-
1975-
module ActorIntegration
1976-
# Asks the actor with its value.
1977-
# @return [Future] new future with the response form the actor
1978-
def then_ask(actor)
1979-
self.then { |v| actor.ask(v) }.flat
1980-
end
1981-
end
1982-
1983-
include ActorIntegration
1984-
end
1985-
1986-
class Channel < Concurrent::Synchronization::Object
1987-
safe_initialization!
1988-
1989-
# Default size of the Channel, makes it accept unlimited number of messages.
1990-
UNLIMITED = ::Object.new
1991-
UNLIMITED.singleton_class.class_eval do
1992-
include Comparable
1993-
1994-
def <=>(other)
1995-
1
1996-
end
1997-
1998-
def to_s
1999-
'unlimited'
2000-
end
2001-
end
2002-
2003-
# A channel to pass messages between promises. The size is limited to support back pressure.
2004-
# @param [Integer, UNLIMITED] size the maximum number of messages stored in the channel.
2005-
def initialize(size = UNLIMITED)
2006-
super()
2007-
@Size = size
2008-
# TODO (pitr-ch 26-Dec-2016): replace with lock-free implementation
2009-
@Mutex = Mutex.new
2010-
@Probes = []
2011-
@Messages = []
2012-
@PendingPush = []
2013-
end
2014-
2015-
2016-
# Returns future which will fulfill when the message is added to the channel. Its value is the message.
2017-
# @param [Object] message
2018-
# @return [Future]
2019-
def push(message)
2020-
@Mutex.synchronize do
2021-
while true
2022-
if @Probes.empty?
2023-
if @Size > @Messages.size
2024-
@Messages.push message
2025-
return Promises.fulfilled_future message
2026-
else
2027-
pushed = Promises.resolvable_future
2028-
@PendingPush.push [message, pushed]
2029-
return pushed.with_hidden_resolvable
2030-
end
2031-
else
2032-
probe = @Probes.shift
2033-
if probe.fulfill [self, message], false
2034-
return Promises.fulfilled_future(message)
2035-
end
2036-
end
2037-
end
2038-
end
2039-
end
2040-
2041-
# Returns a future witch will become fulfilled with a value from the channel when one is available.
2042-
# @param [ResolvableFuture] probe the future which will be fulfilled with a channel value
2043-
# @return [Future] the probe, its value will be the message when available.
2044-
def pop(probe = Concurrent::Promises.resolvable_future)
2045-
# TODO (pitr-ch 26-Dec-2016): improve performance
2046-
pop_for_select(probe).then(&:last)
2047-
end
2048-
2049-
# @!visibility private
2050-
def pop_for_select(probe = Concurrent::Promises.resolvable_future)
2051-
@Mutex.synchronize do
2052-
if @Messages.empty?
2053-
@Probes.push probe
2054-
else
2055-
message = @Messages.shift
2056-
probe.fulfill [self, message]
2057-
2058-
unless @PendingPush.empty?
2059-
message, pushed = @PendingPush.shift
2060-
@Messages.push message
2061-
pushed.fulfill message
2062-
end
2063-
end
2064-
end
2065-
probe
2066-
end
2067-
2068-
# @return [String] Short string representation.
2069-
def to_s
2070-
format '%s size:%s>', super[0..-2], @Size
2071-
end
2072-
2073-
alias_method :inspect, :to_s
2074-
end
2075-
2076-
class Future < AbstractEventFuture
2077-
module NewChannelIntegration
2078-
2079-
# @param [Channel] channel to push to.
2080-
# @return [Future] a future which is fulfilled after the message is pushed to the channel.
2081-
# May take a moment if the channel is full.
2082-
def then_push_channel(channel)
2083-
self.then { |value| channel.push value }.flat_future
2084-
end
2085-
2086-
# TODO (pitr-ch 26-Dec-2016): does it make sense to have rescue an chain variants as well, check other integrations as well
2087-
end
2088-
2089-
include NewChannelIntegration
2090-
end
2091-
2092-
module FactoryMethods
2093-
2094-
module NewChannelIntegration
2095-
2096-
# Selects a channel which is ready to be read from.
2097-
# @param [Channel] channels
2098-
# @return [Future] a future which is fulfilled with pair [channel, message] when one of the channels is
2099-
# available for reading
2100-
def select_channel(*channels)
2101-
probe = Promises.resolvable_future
2102-
channels.each { |ch| ch.pop_for_select probe }
2103-
probe
2104-
end
2105-
end
2106-
2107-
include NewChannelIntegration
2108-
end
2109-
2110-
end
2111-
end

0 commit comments

Comments
 (0)