@@ -48,6 +48,10 @@ function Agent(backend, stream) {
4848 // request if the client disconnects ungracefully. This is a
4949 // map of channel -> id -> request
5050 this . presenceRequests = Object . create ( null ) ;
51+ // Keep track of the latest known Doc version, so that we can avoid fetching
52+ // ops to transform presence if not needed
53+ this . latestDocVersionStreams = Object . create ( null ) ;
54+ this . latestDocVersions = Object . create ( null ) ;
5155
5256 // We need to track this manually to make sure we don't reply to messages
5357 // after the stream was closed.
@@ -108,24 +112,21 @@ Agent.prototype._cleanup = function() {
108112 emitter . destroy ( ) ;
109113 }
110114 this . subscribedQueries = Object . create ( null ) ;
115+
116+ for ( var collection in this . latestDocVersionStreams ) {
117+ var streams = this . latestDocVersionStreams [ collection ] ;
118+ for ( var id in streams ) streams [ id ] . destroy ( ) ;
119+ }
120+ this . latestDocVersionStreams = Object . create ( null ) ;
111121} ;
112122
113123/**
114124 * Passes operation data received on stream to the agent stream via
115125 * _sendOp()
116126 */
117127Agent . prototype . _subscribeToStream = function ( collection , id , stream ) {
118- if ( this . closed ) return stream . destroy ( ) ;
119-
120- var streams = this . subscribedDocs [ collection ] || ( this . subscribedDocs [ collection ] = Object . create ( null ) ) ;
121-
122- // If already subscribed to this document, destroy the previously subscribed stream
123- var previous = streams [ id ] ;
124- if ( previous ) previous . destroy ( ) ;
125- streams [ id ] = stream ;
126-
127128 var agent = this ;
128- stream . on ( 'data' , function ( data ) {
129+ this . _subscribeMapToStream ( this . subscribedDocs , collection , id , stream , function ( data ) {
129130 if ( data . error ) {
130131 // Log then silently ignore errors in a subscription stream, since these
131132 // may not be the client's fault, and they were not the result of a
@@ -135,13 +136,26 @@ Agent.prototype._subscribeToStream = function(collection, id, stream) {
135136 }
136137 agent . _onOp ( collection , id , data ) ;
137138 } ) ;
139+ } ;
140+
141+ Agent . prototype . _subscribeMapToStream = function ( map , collection , id , stream , dataHandler ) {
142+ if ( this . closed ) return stream . destroy ( ) ;
143+
144+ var streams = map [ collection ] || ( map [ collection ] = Object . create ( null ) ) ;
145+
146+ // If already subscribed to this document, destroy the previously subscribed stream
147+ var previous = streams [ id ] ;
148+ if ( previous ) previous . destroy ( ) ;
149+ streams [ id ] = stream ;
150+
151+ stream . on ( 'data' , dataHandler ) ;
138152 stream . on ( 'end' , function ( ) {
139153 // The op stream is done sending, so release its reference
140- var streams = agent . subscribedDocs [ collection ] ;
154+ var streams = map [ collection ] ;
141155 if ( ! streams || streams [ id ] !== stream ) return ;
142156 delete streams [ id ] ;
143157 if ( util . hasKeys ( streams ) ) return ;
144- delete agent . subscribedDocs [ collection ] ;
158+ delete map [ collection ] ;
145159 } ) ;
146160} ;
147161
@@ -794,25 +808,74 @@ Agent.prototype._broadcastPresence = function(presence, callback) {
794808 collection : presence . c
795809 } ;
796810 var start = Date . now ( ) ;
797- backend . trigger ( backend . MIDDLEWARE_ACTIONS . receivePresence , this , context , function ( error ) {
811+
812+ var subscriptionUpdater = presence . p === null ?
813+ this . _unsubscribeDocVersion . bind ( this ) :
814+ this . _subscribeDocVersion . bind ( this ) ;
815+
816+ subscriptionUpdater ( presence . c , presence . d , function ( error ) {
798817 if ( error ) return callback ( error ) ;
799- var requests = presenceRequests [ presence . ch ] || ( presenceRequests [ presence . ch ] = Object . create ( null ) ) ;
800- var previousRequest = requests [ presence . id ] ;
801- if ( ! previousRequest || previousRequest . pv < presence . pv ) {
802- presenceRequests [ presence . ch ] [ presence . id ] = presence ;
803- }
804- backend . transformPresenceToLatestVersion ( agent , presence , function ( error , presence ) {
818+ backend . trigger ( backend . MIDDLEWARE_ACTIONS . receivePresence , agent , context , function ( error ) {
805819 if ( error ) return callback ( error ) ;
806- var channel = agent . _getPresenceChannel ( presence . ch ) ;
807- agent . backend . pubsub . publish ( [ channel ] , presence , function ( error ) {
808- if ( error ) return callback ( error ) ;
809- backend . emit ( 'timing' , 'presence.broadcast' , Date . now ( ) - start , context ) ;
820+ var requests = presenceRequests [ presence . ch ] || ( presenceRequests [ presence . ch ] = Object . create ( null ) ) ;
821+ var previousRequest = requests [ presence . id ] ;
822+ if ( ! previousRequest || previousRequest . pv < presence . pv ) {
823+ presenceRequests [ presence . ch ] [ presence . id ] = presence ;
824+ }
825+
826+ var transformer = function ( agent , presence , callback ) {
810827 callback ( null , presence ) ;
828+ } ;
829+
830+ var latestDocVersion = util . dig ( agent . latestDocVersions , presence . c , presence . d ) ;
831+ var presenceIsUpToDate = presence . v === latestDocVersion ;
832+ if ( ! presenceIsUpToDate ) {
833+ transformer = backend . transformPresenceToLatestVersion . bind ( backend ) ;
834+ }
835+
836+ transformer ( agent , presence , function ( error , presence ) {
837+ if ( error ) return callback ( error ) ;
838+ var channel = agent . _getPresenceChannel ( presence . ch ) ;
839+ agent . backend . pubsub . publish ( [ channel ] , presence , function ( error ) {
840+ if ( error ) return callback ( error ) ;
841+ backend . emit ( 'timing' , 'presence.broadcast' , Date . now ( ) - start , context ) ;
842+ callback ( null , presence ) ;
843+ } ) ;
811844 } ) ;
812845 } ) ;
813846 } ) ;
814847} ;
815848
849+ Agent . prototype . _subscribeDocVersion = function ( collection , id , callback ) {
850+ if ( ! collection || ! id ) return callback ( ) ;
851+
852+ var latestDocVersions = this . latestDocVersions ;
853+ var isSubscribed = util . dig ( latestDocVersions , collection , id ) !== undefined ;
854+ if ( isSubscribed ) return callback ( ) ;
855+
856+ var agent = this ;
857+ this . backend . subscribe ( this , collection , id , null , function ( error , stream , snapshot ) {
858+ if ( error ) return callback ( error ) ;
859+
860+ var versions = latestDocVersions [ collection ] || ( latestDocVersions [ collection ] = Object . create ( null ) ) ;
861+ versions [ id ] = snapshot . v ;
862+
863+ agent . _subscribeMapToStream ( agent . latestDocVersionStreams , collection , id , stream , function ( op ) {
864+ // op.v behind snapshot.v by 1
865+ latestDocVersions [ collection ] [ id ] = op . v + 1 ;
866+ } ) ;
867+
868+ callback ( ) ;
869+ } ) ;
870+ } ;
871+
872+ Agent . prototype . _unsubscribeDocVersion = function ( collection , id , callback ) {
873+ var stream = util . dig ( this . latestDocVersionStreams , collection , id ) ;
874+ if ( stream ) stream . destroy ( ) ;
875+ util . digAndRemove ( this . latestDocVersions , collection , id ) ;
876+ util . nextTick ( callback ) ;
877+ } ;
878+
816879Agent . prototype . _createPresence = function ( request ) {
817880 return {
818881 a : ACTIONS . presence ,
0 commit comments