Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion ruby_event_store/lib/ruby_event_store.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
require_relative "ruby_event_store/client"
require_relative "ruby_event_store/metadata"
require_relative "ruby_event_store/specification"
require_relative "ruby_event_store/specification_result"
require_relative "ruby_event_store/specification_reader"
require_relative "ruby_event_store/event"
require_relative "ruby_event_store/stream"
Expand Down
135 changes: 82 additions & 53 deletions ruby_event_store/lib/ruby_event_store/specification.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,59 @@
module RubyEventStore
# Used for building and executing the query specification.
class Specification
Result =
Data.define(
:direction,
:start,
:stop,
:older_than,
:older_than_or_equal,
:newer_than,
:newer_than_or_equal,
:time_sort_by,
:limit,
:stream,
:read_as,
:batch_size,
:with_ids,
:with_types,
) do
def limit? = !limit.infinite?
def with_ids? = !with_ids.nil?
def with_types? = !with_types.empty?
def forward? = direction.equal?(:forward)
def backward? = direction.equal?(:backward)
def batched? = read_as.equal?(:batch)
def first? = read_as.equal?(:first)
def last? = read_as.equal?(:last)
def all? = read_as.equal?(:all)
def time_sort_by_as_at? = time_sort_by.equal?(:as_at)
def time_sort_by_as_of? = time_sort_by.equal?(:as_of)
end

DEFAULT_BATCH_SIZE = 100

# @api private
# @private
def initialize(reader, result = SpecificationResult.new)
def initialize(
reader,
result = SpecificationResult.new(
direction: :forward,
start: nil,
stop: nil,
older_than: nil,
older_than_or_equal: nil,
newer_than: nil,
newer_than_or_equal: nil,
time_sort_by: nil,
limit: Float::INFINITY,
stream: Stream.new(GLOBAL_STREAM),
read_as: :all,
batch_size: DEFAULT_BATCH_SIZE,
with_ids: nil,
with_types: [],
)
)
@reader = reader
@result = result
end
Expand All @@ -18,7 +66,7 @@ def initialize(reader, result = SpecificationResult.new)
# @param stream_name [String] name of the stream to get events from
# @return [Specification]
def stream(stream_name)
Specification.new(reader, result.dup { |r| r.stream = Stream.new(stream_name) })
Specification.new(reader, result.with(stream: Stream.new(stream_name)))
end

# Limits the query to events before or after another event.
Expand All @@ -28,7 +76,8 @@ def stream(stream_name)
# @return [Specification]
def from(start)
raise InvalidPageStart if start.nil? || start.empty?
Specification.new(reader, result.dup { |r| r.start = start })

Specification.new(reader, result.with(start: start))
end

# Limits the query to events before or after another event.
Expand All @@ -38,7 +87,8 @@ def from(start)
# @return [Specification]
def to(stop)
raise InvalidPageStop if stop.nil? || stop.empty?
Specification.new(reader, result.dup { |r| r.stop = stop })

Specification.new(reader, result.with(stop: stop))
end

# Limits the query to events that occurred before given time.
Expand All @@ -48,13 +98,8 @@ def to(stop)
# @return [Specification]
def older_than(time)
raise ArgumentError unless time.respond_to?(:to_time)
Specification.new(
reader,
result.dup do |r|
r.older_than = time
r.older_than_or_equal = nil
end,
)

Specification.new(reader, result.with(older_than: time, older_than_or_equal: nil))
end

# Limits the query to events that occurred on or before given time.
Expand All @@ -64,13 +109,8 @@ def older_than(time)
# @return [Specification]
def older_than_or_equal(time)
raise ArgumentError unless time.respond_to?(:to_time)
Specification.new(
reader,
result.dup do |r|
r.older_than = nil
r.older_than_or_equal = time
end,
)

Specification.new(reader, result.with(older_than: nil, older_than_or_equal: time))
end

# Limits the query to events that occurred after given time.
Expand All @@ -80,13 +120,8 @@ def older_than_or_equal(time)
# @return [Specification]
def newer_than(time)
raise ArgumentError unless time.respond_to?(:to_time)
Specification.new(
reader,
result.dup do |r|
r.newer_than_or_equal = nil
r.newer_than = time
end,
)

Specification.new(reader, result.with(newer_than: time, newer_than_or_equal: nil))
end

# Limits the query to events that occurred on or after given time.
Expand All @@ -96,13 +131,8 @@ def newer_than(time)
# @return [Specification]
def newer_than_or_equal(time)
raise ArgumentError unless time.respond_to?(:to_time)
Specification.new(
reader,
result.dup do |r|
r.newer_than_or_equal = time
r.newer_than = nil
end,
)

Specification.new(reader, result.with(newer_than: nil, newer_than_or_equal: time))
end

# Limits the query to events within given time range.
Expand All @@ -123,31 +153,31 @@ def between(time_range)
#
# @return [Specification]
def as_at
Specification.new(reader, result.dup { |r| r.time_sort_by = :as_at })
Specification.new(reader, result.with(time_sort_by: :as_at))
end

# Sets the order of time sorting using validity time
# {http://railseventstore.org/docs/read/ Find out more}
#
# @return [Specification]
def as_of
Specification.new(reader, result.dup { |r| r.time_sort_by = :as_of })
Specification.new(reader, result.with(time_sort_by: :as_of))
end

# Sets the order of reading events to ascending (forward from the start).
# {http://railseventstore.org/docs/read/ Find out more}.
#
# @return [Specification]
def forward
Specification.new(reader, result.dup { |r| r.direction = :forward })
Specification.new(reader, result.with(direction: :forward))
end

# Sets the order of reading events to descending (backward from the start).
# {http://railseventstore.org/docs/read/ Find out more}.
#
# @return [Specification]
def backward
Specification.new(reader, result.dup { |r| r.direction = :backward })
Specification.new(reader, result.with(direction: :backward))
end

# Limits the query to specified number of events.
Expand All @@ -157,7 +187,8 @@ def backward
# @return [Specification]
def limit(count)
raise InvalidPageSize unless count && count > 0
Specification.new(reader, result.dup { |r| r.count = count })

Specification.new(reader, result.with(limit: count))
end

# Executes the query based on the specification built up to this point.
Expand All @@ -166,10 +197,10 @@ def limit(count)
#
# @yield [Array<Event>] batch of events
# @return [Enumerator, nil] Enumerator is returned when block not given
def each_batch
def each_batch(&block)
return to_enum(:each_batch) unless block_given?

reader.each(in_batches(result.batch_size).result) { |batch| yield batch }
reader.each(in_batches(result.batch_size).result, &block)
end

# Executes the query based on the specification built up to this point.
Expand All @@ -178,10 +209,10 @@ def each_batch
#
# @yield [Event] event
# @return [Enumerator, nil] Enumerator is returned when block not given
def each
def each(&block)
return to_enum unless block_given?

each_batch { |batch| batch.each { |event| yield event } }
each_batch { |batch| batch.each(&block) }
end

# Executes the query based on the specification built up to this point
Expand All @@ -191,6 +222,7 @@ def each
# @return [Array] of mapped result
def map(&block)
raise ArgumentError.new("Block must be given") unless block_given?

each.map(&block)
end

Expand All @@ -202,6 +234,7 @@ def map(&block)
# @return reduce result as defined by block given
def reduce(accumulator = nil, &block)
raise ArgumentError.new("Block must be given") unless block_given?

each.reduce(accumulator, &block)
end

Expand Down Expand Up @@ -236,13 +269,7 @@ def to_a
# @param batch_size [Integer] number of events to read in a single batch
# @return [Specification]
def in_batches(batch_size = DEFAULT_BATCH_SIZE)
Specification.new(
reader,
result.dup do |r|
r.read_as = :batch
r.batch_size = batch_size
end,
)
Specification.new(reader, result.with(read_as: :batch, batch_size: batch_size))
end
alias in_batches_of in_batches

Expand All @@ -251,15 +278,15 @@ def in_batches(batch_size = DEFAULT_BATCH_SIZE)
#
# @return [Specification]
def read_first
Specification.new(reader, result.dup { |r| r.read_as = :first })
Specification.new(reader, result.with(read_as: :first))
end

# Specifies that only last event should be read.
# {http://railseventstore.org/docs/read/ Find out more}.
#
# @return [Specification]
def read_last
Specification.new(reader, result.dup { |r| r.read_as = :last })
Specification.new(reader, result.with(read_as: :last))
end

# Executes the query based on the specification built up to this point.
Expand All @@ -286,17 +313,17 @@ def last
# @types [Class, Array(Class)] types of event to look for.
# @return [Specification]
def of_type(*types)
Specification.new(reader, result.dup { |r| r.with_types = types.flatten })
Specification.new(reader, result.with(with_types: types.flatten.map(&:to_s)))
end
alias_method :of_types, :of_type
alias of_types of_type

# Limits the query to certain events by given even ids.
# {http://railseventstore.org/docs/read/ Find out more}.
#
# @param event_ids [Array(String)] ids of event to look for.
# @return [Specification]
def with_id(event_ids)
Specification.new(reader, result.dup { |r| r.with_ids = event_ids })
Specification.new(reader, result.with(with_ids: event_ids))
end

# Reads single event from repository.
Expand Down Expand Up @@ -336,4 +363,6 @@ def events(event_ids)

attr_reader :reader
end

SpecificationResult = Specification::Result
end
Loading