@@ -65,6 +65,20 @@ def global_position(event_id)
65
65
66
66
protected
67
67
68
+ def find_event_id_in_stream ( specification_event_id , specification_stream_name )
69
+ stream_entries
70
+ . by_stream_and_event_id ( specification_stream_name , specification_event_id )
71
+ . fetch ( :id )
72
+ rescue ::ROM ::TupleCountMismatchError
73
+ raise EventNotFound . new ( specification_event_id )
74
+ end
75
+
76
+ def find_event_id_globally ( specification_event_id )
77
+ events . by_event_id ( specification_event_id ) . one! . fetch ( :id )
78
+ rescue ::ROM ::TupleCountMismatchError
79
+ raise EventNotFound . new ( specification_event_id )
80
+ end
81
+
68
82
def read_scope ( specification )
69
83
direction = specification . forward? ? :forward : :backward
70
84
@@ -73,20 +87,14 @@ def read_scope(specification)
73
87
end
74
88
75
89
if specification . stream . global?
76
- offset_entry_id = events . by_event_id ( specification . start ) . one! . fetch ( :id ) if specification . start
77
- stop_entry_id = events . by_event_id ( specification . stop ) . one! . fetch ( :id ) if specification . stop
90
+ offset_entry_id = find_event_id_globally ( specification . start ) if specification . start
91
+ stop_entry_id = find_event_id_globally ( specification . stop ) if specification . stop
78
92
79
93
query = events . ordered ( direction , offset_entry_id , stop_entry_id , specification . time_sort_by )
80
94
query = query . map_with ( :event_to_serialized_record , auto_struct : false )
81
95
else
82
- offset_entry_id =
83
- stream_entries
84
- . by_stream_and_event_id ( specification . stream , specification . start )
85
- . fetch ( :id ) if specification . start
86
- stop_entry_id =
87
- stream_entries
88
- . by_stream_and_event_id ( specification . stream , specification . stop )
89
- . fetch ( :id ) if specification . stop
96
+ offset_entry_id = find_event_id_in_stream ( specification . start , specification . stream ) if specification . start
97
+ stop_entry_id = find_event_id_in_stream ( specification . stop , specification . stream ) if specification . stop
90
98
91
99
query =
92
100
stream_entries . ordered (
0 commit comments