@@ -7,39 +7,14 @@ class EventRepository
7
7
8
8
def initialize ( model_factory : WithDefaultModels . new , serializer :)
9
9
@serializer = serializer
10
- @event_klass , @stream_klass = model_factory . call
11
- if serializer == NULL && json_data_type?
12
- warn <<~MSG
13
- The data or metadata column is of a JSON/B type and expects a JSON string.
14
-
15
- Yet the repository serializer is configured as #{ serializer } and it would not
16
- produce the expected JSON string.
17
-
18
- In ActiveRecord there's an implicit serialization to JSON for JSON/B column types
19
- that made it work so far. This behaviour is unfortunately also a source of undesired
20
- double serialization — first in the EventRepository, second in the ActiveRecord.
21
-
22
- In the past we've advised workarounds that introduced configuration incosistency
23
- with other data types and serialization formats, i.e. explicitly passing NULL serializer
24
- just for the JSON/B data types.
25
-
26
- As of now this special ActiveRecord behaviour is disabled. You should be using JSON
27
- serializer back again:
28
-
29
- RubyEventStore::ActiveRecord::EventRepository.new(serializer: JSON)
30
- MSG
31
- else
32
- @event_klass . include ( SkipJsonSerialization )
33
- end
34
- @repo_reader = EventRepositoryReader . new ( @event_klass , @stream_klass , serializer )
35
- @index_violation_detector = IndexViolationDetector . new ( @event_klass . table_name , @stream_klass . table_name )
10
+ @model_factory = model_factory
36
11
end
37
12
38
13
def rescue_from_double_json_serialization!
39
- if @serializer == JSON && json_data_type?
40
- @ repo_reader. instance_eval { alias __record__ record }
14
+ if @serializer == JSON && json_data_type? ( event_klass )
15
+ repo_reader . instance_eval { alias __record__ record }
41
16
42
- @ repo_reader. define_singleton_method :unwrap do |column_name , payload |
17
+ repo_reader . define_singleton_method :unwrap do |column_name , payload |
43
18
if String === payload && payload . start_with? ( "\{ " )
44
19
warn "Double serialization of #{ column_name } column detected"
45
20
@serializer . load ( payload )
@@ -48,7 +23,7 @@ def rescue_from_double_json_serialization!
48
23
end
49
24
end
50
25
51
- @ repo_reader. define_singleton_method :record do |record |
26
+ repo_reader . define_singleton_method :record do |record |
52
27
r = __record__ ( record )
53
28
54
29
Record . new (
@@ -76,31 +51,31 @@ def link_to_stream(event_ids, stream, expected_version)
76
51
end
77
52
78
53
def delete_stream ( stream )
79
- @ stream_klass. where ( stream : stream . name ) . delete_all
54
+ stream_klass . where ( stream : stream . name ) . delete_all
80
55
end
81
56
82
57
def has_event? ( event_id )
83
- @ repo_reader. has_event? ( event_id )
58
+ repo_reader . has_event? ( event_id )
84
59
end
85
60
86
61
def last_stream_event ( stream )
87
- @ repo_reader. last_stream_event ( stream )
62
+ repo_reader . last_stream_event ( stream )
88
63
end
89
64
90
65
def read ( specification )
91
- @ repo_reader. read ( specification )
66
+ repo_reader . read ( specification )
92
67
end
93
68
94
69
def count ( specification )
95
- @ repo_reader. count ( specification )
70
+ repo_reader . count ( specification )
96
71
end
97
72
98
73
def update_messages ( records )
99
74
hashes = records . map { |record | upsert_hash ( record , record . serialize ( @serializer ) ) }
100
75
for_update = records . map ( &:event_id )
101
76
start_transaction do
102
77
existing =
103
- @ event_klass
78
+ event_klass
104
79
. where ( event_id : for_update )
105
80
. pluck ( :event_id , :id , :created_at )
106
81
. reduce ( { } ) { |acc , ( event_id , id , created_at ) | acc . merge ( event_id => [ id , created_at ] ) }
@@ -109,31 +84,31 @@ def update_messages(records)
109
84
h [ :id ] = existing . fetch ( h . fetch ( :event_id ) ) . at ( 0 )
110
85
h [ :created_at ] = existing . fetch ( h . fetch ( :event_id ) ) . at ( 1 )
111
86
end
112
- @ event_klass. upsert_all ( hashes )
87
+ event_klass . upsert_all ( hashes )
113
88
end
114
89
end
115
90
116
91
def streams_of ( event_id )
117
- @ repo_reader. streams_of ( event_id )
92
+ repo_reader . streams_of ( event_id )
118
93
end
119
94
120
95
def position_in_stream ( event_id , stream )
121
- @ repo_reader. position_in_stream ( event_id , stream )
96
+ repo_reader . position_in_stream ( event_id , stream )
122
97
end
123
98
124
99
def global_position ( event_id )
125
- @ repo_reader. global_position ( event_id )
100
+ repo_reader . global_position ( event_id )
126
101
end
127
102
128
103
def event_in_stream? ( event_id , stream )
129
- @ repo_reader. event_in_stream? ( event_id , stream )
104
+ repo_reader . event_in_stream? ( event_id , stream )
130
105
end
131
106
132
107
private
133
108
134
109
def add_to_stream ( event_ids , stream , expected_version )
135
110
last_stream_version = -> ( stream_ ) do
136
- @ stream_klass. where ( stream : stream_ . name ) . order ( "position DESC" ) . first . try ( :position )
111
+ stream_klass . where ( stream : stream_ . name ) . order ( "position DESC" ) . first . try ( :position )
137
112
end
138
113
resolved_version = expected_version . resolve_for ( stream , last_stream_version )
139
114
@@ -148,7 +123,7 @@ def add_to_stream(event_ids, stream, expected_version)
148
123
created_at : Time . now . utc ,
149
124
}
150
125
end
151
- @ stream_klass. insert_all! ( in_stream ) unless stream . global?
126
+ stream_klass . insert_all! ( in_stream ) unless stream . global?
152
127
end
153
128
self
154
129
rescue ::ActiveRecord ::RecordNotUnique => e
@@ -165,7 +140,7 @@ def compute_position(resolved_version, index)
165
140
end
166
141
167
142
def detect_index_violated ( message )
168
- @ index_violation_detector. detect ( message )
143
+ index_violation_detector . detect ( message )
169
144
end
170
145
171
146
def insert_hash ( record , serialized_record )
@@ -194,11 +169,11 @@ def optimize_timestamp(valid_at, created_at)
194
169
end
195
170
196
171
def start_transaction ( &block )
197
- @ event_klass. transaction ( requires_new : true , &block )
172
+ event_klass . transaction ( requires_new : true , &block )
198
173
end
199
174
200
175
def link_to_stream_ ( event_ids , stream , expected_version )
201
- ( event_ids - @ event_klass. where ( event_id : event_ids ) . pluck ( :event_id ) ) . each { |id | raise EventNotFound . new ( id ) }
176
+ ( event_ids - event_klass . where ( event_id : event_ids ) . pluck ( :event_id ) ) . each { |id | raise EventNotFound . new ( id ) }
202
177
add_to_stream ( event_ids , stream , expected_version )
203
178
end
204
179
@@ -209,11 +184,50 @@ def append_to_stream_(records, stream, expected_version)
209
184
hashes << insert_hash ( record , record . serialize ( @serializer ) )
210
185
event_ids << record . event_id
211
186
end
212
- add_to_stream ( event_ids , stream , expected_version ) { @event_klass . insert_all! ( hashes ) }
187
+ add_to_stream ( event_ids , stream , expected_version ) { event_klass . insert_all! ( hashes ) }
188
+ end
189
+
190
+ def model_klasses
191
+ @model_klasses ||= @model_factory . call . tap do |event_klass , stream_klass |
192
+ if @serializer == NULL && json_data_type? ( event_klass )
193
+ warn <<~MSG
194
+ The data or metadata column is of a JSON/B type and expects a JSON string.
195
+
196
+ Yet the repository serializer is configured as #{ @serializer } and it would not
197
+ produce the expected JSON string.
198
+
199
+ In ActiveRecord there's an implicit serialization to JSON for JSON/B column types
200
+ that made it work so far. This behaviour is unfortunately also a source of undesired
201
+ double serialization — first in the EventRepository, second in the ActiveRecord.
202
+
203
+ In the past we've advised workarounds that introduced configuration incosistency
204
+ with other data types and serialization formats, i.e. explicitly passing NULL serializer
205
+ just for the JSON/B data types.
206
+
207
+ As of now this special ActiveRecord behaviour is disabled. You should be using JSON
208
+ serializer back again:
209
+
210
+ RubyEventStore::ActiveRecord::EventRepository.new(serializer: JSON)
211
+ MSG
212
+ else
213
+ event_klass . include ( SkipJsonSerialization )
214
+ end
215
+ end
216
+ end
217
+
218
+ def event_klass = model_klasses . first
219
+ def stream_klass = model_klasses . last
220
+
221
+ def repo_reader
222
+ @repo_reader ||= EventRepositoryReader . new ( event_klass , stream_klass , @serializer )
223
+ end
224
+
225
+ def index_violation_detector
226
+ @index_violation_detector ||= IndexViolationDetector . new ( event_klass . table_name , stream_klass . table_name )
213
227
end
214
228
215
- def json_data_type?
216
- %i[ data metadata ] . any? { |attr | @event_klass . column_for_attribute ( attr ) . type . start_with? ( "json" ) }
229
+ def json_data_type? ( klass )
230
+ %i[ data metadata ] . any? { |attr | klass . column_for_attribute ( attr ) . type . start_with? ( "json" ) }
217
231
end
218
232
end
219
233
end
0 commit comments