@@ -23,7 +23,7 @@ def by_event_id(event_id)
23
23
end
24
24
25
25
def by_event_type ( types )
26
- join ( :events ) . where ( event_type : types )
26
+ join_events . where ( event_type : types )
27
27
end
28
28
29
29
def by_stream_and_event_id ( stream , event_id )
@@ -35,19 +35,19 @@ def max_position(stream)
35
35
end
36
36
37
37
def newer_than ( time )
38
- join ( :events ) . where { |r | r . events [ :created_at ] > time . localtime }
38
+ join_events . where { |r | r . events [ :created_at ] > time . localtime }
39
39
end
40
40
41
41
def newer_than_or_equal ( time )
42
- join ( :events ) . where { |r | r . events [ :created_at ] >= time . localtime }
42
+ join_events . where { |r | r . events [ :created_at ] >= time . localtime }
43
43
end
44
44
45
45
def older_than ( time )
46
- join ( :events ) . where { |r | r . events [ :created_at ] < time . localtime }
46
+ join_events . where { |r | r . events [ :created_at ] < time . localtime }
47
47
end
48
48
49
49
def older_than_or_equal ( time )
50
- join ( :events ) . where { |r | r . events [ :created_at ] <= time . localtime }
50
+ join_events . where { |r | r . events [ :created_at ] <= time . localtime }
51
51
end
52
52
53
53
DIRECTION_MAP = { forward : %i[ asc > < ] , backward : %i[ desc < > ] } . freeze
@@ -74,7 +74,15 @@ def ordered(direction, stream, offset_entry_id = nil, stop_entry_id = nil, time_
74
74
if event_order_columns . empty?
75
75
query . order { |r | stream_order_columns . map { |c | r [ :stream_entries ] [ c ] . public_send ( order ) } }
76
76
else
77
- query . join ( :events ) . order { |r | event_order_columns . map { |c | r . events [ c ] . public_send ( order ) } }
77
+ query . join_events . order { |r | event_order_columns . map { |c | r . events [ c ] . public_send ( order ) } }
78
+ end
79
+ end
80
+
81
+ def join_events
82
+ if dataset . opts [ :join ] &.map ( &:table ) &.include? ( events . dataset . first_source_table )
83
+ self
84
+ else
85
+ join ( :events )
78
86
end
79
87
end
80
88
end
0 commit comments