@@ -76,28 +76,19 @@ func PushPull(
7676 opts PushPullOptions ,
7777) (* ServerPack , error ) {
7878 start := gotime .Now ()
79- defer func () {
80- be .Metrics .ObservePushPullResponseSeconds (gotime .Since (start ).Seconds ())
81- }()
8279
8380 // TODO: Changes may be reordered or missing during communication on the network.
8481 // We should check the change.pack with checkpoint to make sure the changes are in the correct order.
8582 initialServerSeq := docInfo .ServerSeq
8683
8784 // 01. push changes: filter out the changes that are already saved in the database.
8885 cpAfterPush , pushedChanges := pushChanges (ctx , clientInfo , docInfo , reqPack , initialServerSeq )
89- hostname := be .Config .Hostname
90- be .Metrics .AddPushPullReceivedChanges (hostname , project , reqPack .ChangesLen ())
91- be .Metrics .AddPushPullReceivedOperations (hostname , project , reqPack .OperationsLen ())
9286
9387 // 02. pull pack: pull changes or a snapshot from the database and create a response pack.
9488 respPack , err := pullPack (ctx , be , clientInfo , docInfo , reqPack , cpAfterPush , initialServerSeq , opts .Mode )
9589 if err != nil {
9690 return nil , err
9791 }
98- be .Metrics .AddPushPullSentChanges (hostname , project , respPack .ChangesLen ())
99- be .Metrics .AddPushPullSentOperations (hostname , project , respPack .OperationsLen ())
100- be .Metrics .AddPushPullSnapshotBytes (hostname , project , respPack .SnapshotLen ())
10192
10293 // 03. update the client's document and checkpoint.
10394 docRefKey := docInfo .RefKey ()
@@ -115,7 +106,7 @@ func PushPull(
115106 }
116107 }
117108
118- // 04. store pushed changes, docInfo and checkpoint of the client to DB.
109+ // 04. store the pushed changes and update the document info in DB.
119110 if len (pushedChanges ) > 0 || reqPack .IsRemoved {
120111 if err := be .DB .CreateChangeInfos (
121112 ctx ,
@@ -128,12 +119,7 @@ func PushPull(
128119 return nil , err
129120 }
130121 }
131-
132- if ! clientInfo .IsServerClient () {
133- if err := be .DB .UpdateClientInfoAfterPushPull (ctx , clientInfo , docInfo ); err != nil {
134- return nil , err
135- }
136- }
122+ respPack .ApplyDocInfo (docInfo )
137123
138124 // 05. update min version vector of the document.
139125 minVersionVector , err := be .DB .UpdateMinVersionVector (
@@ -149,7 +135,12 @@ func PushPull(
149135 respPack .VersionVector = minVersionVector
150136 }
151137
152- respPack .ApplyDocInfo (docInfo )
138+ // 06. update client's checkpoint of the document in DB.
139+ if ! clientInfo .IsServerClient () {
140+ if err := be .DB .UpdateClientInfoAfterPushPull (ctx , clientInfo , docInfo ); err != nil {
141+ return nil , err
142+ }
143+ }
153144
154145 pullLog := strconv .Itoa (respPack .ChangesLen ())
155146 if respPack .SnapshotLen () > 0 {
@@ -163,8 +154,15 @@ func PushPull(
163154 pullLog ,
164155 gotime .Since (start ),
165156 )
157+ hostname := be .Config .Hostname
158+ be .Metrics .AddPushPullReceivedChanges (hostname , project , reqPack .ChangesLen ())
159+ be .Metrics .AddPushPullReceivedOperations (hostname , project , reqPack .OperationsLen ())
160+ be .Metrics .AddPushPullSentChanges (hostname , project , respPack .ChangesLen ())
161+ be .Metrics .AddPushPullSentOperations (hostname , project , respPack .OperationsLen ())
162+ be .Metrics .AddPushPullSnapshotBytes (hostname , project , respPack .SnapshotLen ())
163+ be .Metrics .ObservePushPullResponseSeconds (gotime .Since (start ).Seconds ())
166164
167- // 06 . publish document change event then store snapshot asynchronously.
165+ // 07 . publish document change event then store snapshot asynchronously.
168166 if len (pushedChanges ) > 0 || reqPack .IsRemoved {
169167 be .Background .AttachGoroutine (func (ctx context.Context ) {
170168 publisherID , err := clientInfo .ID .ToActorID ()
@@ -200,30 +198,21 @@ func PushPull(
200198 }
201199 }
202200
203- locker , err := be .Lockers .Locker (ctx , SnapshotKey (project .ID , reqPack .DocumentKey ))
204- if err != nil {
205- logging .From (ctx ).Error (err )
206- return
207- }
208-
209- // NOTE: If the snapshot is already being created by another routine, it
210- // is not necessary to recreate it, so we can skip it.
211- if err := locker .TryLock (ctx ); err != nil {
201+ // NOTE(hackerwins): If the snapshot is already being created by another routine,
202+ // it is not necessary to recreate it, so we can skip it.
203+ locker , ok := be .Lockers .LockerWithTryLock (SnapshotKey (project .ID , reqPack .DocumentKey ))
204+ if ! ok {
212205 return
213206 }
214207 defer func () {
215- if err := locker .Unlock (ctx ); err != nil {
208+ if err := locker .Unlock (); err != nil {
216209 logging .From (ctx ).Error (err )
217210 return
218211 }
219212 }()
220213
221214 start := gotime .Now ()
222- if err := storeSnapshot (
223- ctx ,
224- be ,
225- docInfo ,
226- ); err != nil {
215+ if err := storeSnapshot (ctx , be , docInfo ); err != nil {
227216 logging .From (ctx ).Error (err )
228217 }
229218 be .Metrics .ObservePushPullSnapshotDurationSeconds (
@@ -260,18 +249,18 @@ func BuildInternalDocForServerSeq(
260249 docInfo * database.DocInfo ,
261250 serverSeq int64 ,
262251) (* document.InternalDocument , error ) {
263- docRefKey := docInfo .RefKey ()
252+ docKey := docInfo .RefKey ()
264253
265254 // NOTE(hackerwins): If the document is already in the cache, we can skip
266255 // the database query and use the cached document. If the document's server
267256 // sequence in the cache is greater than the given server sequence, we can't
268257 // build the document from the document. In this case, we need to
269258 // query the database to get the closest snapshot information.
270- doc , ok := be .SnapshotCache .Get (docRefKey )
259+ doc , ok := be .SnapshotCache .Get (docKey )
271260 if ! ok || serverSeq < doc .Checkpoint ().ServerSeq {
272261 snapshotInfo , err := be .DB .FindClosestSnapshotInfo (
273262 ctx ,
274- docRefKey ,
263+ docKey ,
275264 serverSeq ,
276265 true ,
277266 )
@@ -293,7 +282,7 @@ func BuildInternalDocForServerSeq(
293282
294283 changes , err := be .DB .FindChangesBetweenServerSeqs (
295284 ctx ,
296- docRefKey ,
285+ docKey ,
297286 doc .Checkpoint ().ServerSeq + 1 ,
298287 serverSeq ,
299288 )
@@ -314,7 +303,7 @@ func BuildInternalDocForServerSeq(
314303 if ! be .Config .SnapshotDisableGC {
315304 vector , err := be .DB .GetMinVersionVector (
316305 ctx ,
317- docRefKey ,
306+ docKey ,
318307 doc .VersionVector (),
319308 )
320309 if err != nil {
@@ -330,7 +319,7 @@ func BuildInternalDocForServerSeq(
330319 if err != nil {
331320 return nil , err
332321 }
333- be .SnapshotCache .Add (docRefKey , clone )
322+ be .SnapshotCache .Add (docKey , clone )
334323
335324 if logging .Enabled (zap .DebugLevel ) {
336325 logging .From (ctx ).Debugf (
0 commit comments