@@ -27,7 +27,7 @@ def append_to_stream(records, stream, expected_version)
2727 data : serialized_record . data ,
2828 metadata : serialized_record . metadata ,
2929 created_at : record . timestamp ,
30- valid_at : record . valid_at
30+ valid_at : optimize_timestamp ( record . valid_at , record . timestamp )
3131 )
3232 unless stream . global?
3333 @db [ :event_store_events_in_streams ] . insert (
@@ -102,7 +102,7 @@ def last_stream_event(stream)
102102 data : event [ :data ] ,
103103 metadata : event [ :metadata ] ,
104104 timestamp : event [ :created_at ] . iso8601 ( TIMESTAMP_PRECISION ) ,
105- valid_at : event [ :valid_at ] . iso8601 ( TIMESTAMP_PRECISION )
105+ valid_at : ( event [ :valid_at ] || event [ :created_at ] ) . iso8601 ( TIMESTAMP_PRECISION )
106106 )
107107 . deserialize ( @serializer )
108108 end
@@ -160,6 +160,10 @@ def streams_of(event_id)
160160
161161 private
162162
163+ def optimize_timestamp ( valid_at , created_at )
164+ valid_at unless valid_at . eql? ( created_at )
165+ end
166+
163167 def record ( h )
164168 SerializedRecord
165169 . new (
@@ -304,7 +308,7 @@ def read_from_global_stream(specification)
304308 end
305309
306310 dataset = dataset . order ( :created_at ) if specification . time_sort_by_as_at?
307- dataset = dataset . order ( :valid_at ) if specification . time_sort_by_as_of?
311+ dataset = dataset . order ( :: Sequel . lit ( coalesced_date ) ) if specification . time_sort_by_as_of?
308312 dataset = dataset . limit ( specification . limit ) if specification . limit?
309313 dataset = dataset . order ( ::Sequel [ :event_store_events ] [ :id ] ) unless specification . time_sort_by
310314 dataset = dataset . reverse if specification . backward?
0 commit comments