@@ -130,28 +130,8 @@ func (s *NotificationProjector) project(
130130 }
131131 }()
132132
133- // Decode or initialize projection state
134- var projectionState interface {}
135- if rawState .Position == 0 {
136- // This is the fist time the projection runs so initialize the state
137- projectionState , err = s .projectionStateInit (ctx )
138- if err != nil {
139- return err
140- }
141- } else if s .projectionStateDecode != nil {
142- // Unmarshal the projection state
143- projectionState , err = s .projectionStateDecode (rawState .ProjectionState )
144- if err != nil {
145- return err
146- }
147- }
148- state := driverSQL.ProjectionState {
149- Position : rawState .Position ,
150- ProjectionState : projectionState ,
151- }
152-
153133 // project event stream
154- if err := s .projectStream (ctx , conn , notification , state , eventStream ); err != nil {
134+ if err := s .projectStream (ctx , conn , notification , rawState , eventStream ); err != nil {
155135 return err
156136 }
157137
@@ -163,9 +143,13 @@ func (s *NotificationProjector) projectStream(
163143 ctx context.Context ,
164144 conn driverSQL.Execer ,
165145 notification * driverSQL.ProjectionNotification ,
166- state driverSQL.ProjectionState ,
146+ rawState * driverSQL.ProjectionRawState ,
167147 stream goengine.EventStream ,
168148) error {
149+ var (
150+ state driverSQL.ProjectionState
151+ stateAcquired bool
152+ )
169153 for stream .Next () {
170154 // Check if the context is expired
171155 select {
@@ -179,7 +163,6 @@ func (s *NotificationProjector) projectStream(
179163 if err != nil {
180164 return err
181165 }
182- state .Position = msgNumber
183166
184167 // Resolve the payload event name
185168 eventName , err := s .resolver .ResolveName (msg .Payload ())
@@ -196,7 +179,17 @@ func (s *NotificationProjector) projectStream(
196179 continue
197180 }
198181
182+ // Acquire the state if we have none
183+ if ! stateAcquired {
184+ state , err = s .acquireProjectState (ctx , rawState )
185+ if err != nil {
186+ return err
187+ }
188+ stateAcquired = true
189+ }
190+
199191 // Execute the handler
192+ state .Position = msgNumber
200193 state .ProjectionState , err = handler (ctx , state .ProjectionState , msg )
201194 if err != nil {
202195 return err
@@ -211,6 +204,24 @@ func (s *NotificationProjector) projectStream(
211204 return stream .Err ()
212205}
213206
207+ func (s * NotificationProjector ) acquireProjectState (ctx context.Context , rawState * driverSQL.ProjectionRawState ) (driverSQL.ProjectionState , error ) {
208+ state := driverSQL.ProjectionState {
209+ Position : rawState .Position ,
210+ }
211+
212+ // Decode or initialize projection state
213+ var err error
214+ if rawState .Position == 0 {
215+ // This is the fist time the projection runs so initialize the state
216+ state .ProjectionState , err = s .projectionStateInit (ctx )
217+ } else if s .projectionStateDecode != nil {
218+ // Unmarshal the projection state
219+ state .ProjectionState , err = s .projectionStateDecode (rawState .ProjectionState )
220+ }
221+
222+ return state , err
223+ }
224+
214225// wrapProjectionHandlers wraps the projection handlers so that any error or panic is caught and returned
215226func wrapProjectionHandlers (handlers map [string ]goengine.MessageHandler ) map [string ]goengine.MessageHandler {
216227 res := make (map [string ]goengine.MessageHandler , len (handlers ))
0 commit comments