@@ -126,6 +126,10 @@ class RemoteStoreImpl implements RemoteStore {
126126 */
127127 writePipeline : MutationBatch [ ] = [ ] ;
128128
129+ sentWrites = new WeakSet < MutationBatch > ( ) ;
130+
131+ writeTimeoutId : ReturnType < typeof setTimeout > | null = null ;
132+
129133 /**
130134 * A mapping of watched targets that the client cares about tracking and the
131135 * user has explicitly called a 'listen' for this target.
@@ -734,10 +738,23 @@ function addToWritePipeline(
734738 ) ;
735739 remoteStoreImpl . writePipeline . push ( batch ) ;
736740
737- const writeStream = ensureWriteStream ( remoteStoreImpl ) ;
738- if ( writeStream . isOpen ( ) && writeStream . handshakeComplete ) {
739- writeStream . writeMutations ( batch . mutations ) ;
741+ if ( remoteStoreImpl . writeTimeoutId !== null ) {
742+ return ;
740743 }
744+
745+ remoteStoreImpl . writeTimeoutId = setTimeout ( ( ) => {
746+ remoteStoreImpl . writeTimeoutId = null ;
747+ const writeStream = ensureWriteStream ( remoteStoreImpl ) ;
748+ if ( writeStream . isOpen ( ) && writeStream . handshakeComplete ) {
749+ for ( const curBatch of remoteStoreImpl . writePipeline ) {
750+ if ( remoteStoreImpl . sentWrites . has ( curBatch ) ) {
751+ continue ;
752+ }
753+ writeStream . writeMutations ( curBatch . mutations ) ;
754+ remoteStoreImpl . sentWrites . add ( curBatch ) ;
755+ }
756+ }
757+ } , 200 ) ;
741758}
742759
743760function shouldStartWriteStream ( remoteStoreImpl : RemoteStoreImpl ) : boolean {
@@ -769,6 +786,7 @@ async function onWriteHandshakeComplete(
769786 // Send the write pipeline now that the stream is established.
770787 for ( const batch of remoteStoreImpl . writePipeline ) {
771788 writeStream . writeMutations ( batch . mutations ) ;
789+ remoteStoreImpl . sentWrites . add ( batch ) ;
772790 }
773791}
774792
0 commit comments