@@ -40,6 +40,7 @@ import type {
40
40
StoredMessage ,
41
41
Configuration ,
42
42
} from "@cocalc/conat/persist/storage" ;
43
+ import type { Changefeed } from "@cocalc/conat/persist/client" ;
43
44
export type { Configuration } ;
44
45
import { join } from "path" ;
45
46
import {
@@ -171,6 +172,7 @@ export class CoreStream<T = any> extends EventEmitter {
171
172
private storage : StorageOptions ;
172
173
private client ?: Client ;
173
174
private persistClient : PersistStreamClient ;
175
+ private changefeed ?: Changefeed ;
174
176
private service ?: string ;
175
177
176
178
constructor ( {
@@ -302,15 +304,15 @@ export class CoreStream<T = any> extends EventEmitter {
302
304
}
303
305
await until (
304
306
async ( ) => {
305
- if ( this . isClosed ( ) ) {
306
- return true ;
307
- }
308
307
let messages : StoredMessage [ ] = [ ] ;
309
308
let changes : ( SetOperation | DeleteOperation | StoredMessage ) [ ] = [ ] ;
310
309
try {
311
310
if ( this . isClosed ( ) ) {
312
311
return true ;
313
312
}
313
+ if ( this . changefeed == null ) {
314
+ this . changefeed = await this . persistClient . changefeed ( ) ;
315
+ }
314
316
// console.log("get persistent stream", { start_seq }, this.storage);
315
317
messages = await this . persistClient . getAll ( {
316
318
start_seq,
@@ -553,64 +555,31 @@ export class CoreStream<T = any> extends EventEmitter {
553
555
// log("core-stream: listen", this.storage);
554
556
await until (
555
557
async ( ) => {
556
- if ( this . client == null ) {
558
+ if ( this . isClosed ( ) ) {
557
559
return true ;
558
560
}
559
561
try {
560
- //log("core-stream: START listening on changefeed", this.storage);
561
- const changefeed = await this . persistClient . changefeed ( ) ;
562
- this . persistClient . on ( "changefeed" , ( updates ) => {
562
+ if ( this . changefeed == null ) {
563
+ this . changefeed = await this . persistClient . changefeed ( ) ;
564
+ if ( this . isClosed ( ) ) {
565
+ return true ;
566
+ }
567
+ }
568
+
569
+ for await ( const updates of this . changefeed ) {
563
570
this . processPersistentMessages ( updates , {
564
571
noEmit : false ,
565
572
noSeqCheck : false ,
566
573
} ) ;
567
- } ) ;
568
-
569
- // Now that we have the changefeed running, we grab any messages that
570
- // might have been missed between the last getAll and when the
571
- // changefeed was running -- this is usually empty, but there are race
572
- // condition where it could be non-empty. The result would be that
573
- // this stream is not aware of a change that happened until we get one
574
- // more change from the changefeed, and then it is aware (so no data loss,
575
- // but it is confusingly hung until something happens). Thus this is
576
- // important to do. This problem arises when opening a file for the
577
- // first time, where the project and browser both open it at almost the
578
- // exact same time and the project inserts a change (the first patch) exactly
579
- // as the browser finished the initial connect and right before the changefeed
580
- // is initialized.
581
- const messages = await this . persistClient . getAll ( {
582
- start_seq : this . lastSeq + 1 ,
583
- timeout : DEFAULT_GET_ALL_TIMEOUT ,
584
- } ) ;
585
- this . processPersistentMessages ( messages , {
586
- noEmit : false ,
587
- noSeqCheck : false ,
588
- } ) ;
589
-
590
- // log(
591
- // this.client.id,
592
- // "core-stream: listening on the changefeed...",
593
- // this.storage,
594
- // );
595
- for await ( const _ of changefeed ) {
596
- // if (this.log) {
597
- // log(
598
- // this.persistClient.id,
599
- // this.client.id,
600
- // "core-stream: changefeed",
601
- // this.storage,
602
- // updates,
603
- // );
604
- // }
605
- if ( this . client == null ) {
574
+ if ( this . isClosed ( ) ) {
606
575
return true ;
607
576
}
608
- // this.processPersistentMessages(updates, {
609
- // noEmit: false,
610
- // noSeqCheck: false,
611
- // });
612
577
}
613
578
} catch ( err ) {
579
+ // There should never be a case where the changefeed throws
580
+ // an error or ends without this whole streaming being closed.
581
+ // If that happens its an unexpected bug. Instead of failing,
582
+ // we log this, loop around, and make a new changefeed.
614
583
// This normally doesn't happen but could if a persist server is being restarted
615
584
// frequently or things are seriously broken. We cause this in
616
585
// backend/conat/test/core/core-stream-break.test.ts
@@ -621,16 +590,17 @@ export class CoreStream<T = any> extends EventEmitter {
621
590
) ;
622
591
}
623
592
}
593
+
594
+ delete this . changefeed ;
595
+
624
596
// above loop exits when the persistent server
625
597
// stops sending messages for some reason. In that
626
- // case we reconnect, picking up where we left off:
598
+ // case we reconnect, picking up where we left off.
599
+
627
600
if ( this . client == null ) {
628
601
return true ;
629
602
}
630
- // log(
631
- // "core-stream: get missing from when changefeed ended",
632
- // this.storage,
633
- // );
603
+
634
604
await this . getAllFromPersist ( {
635
605
start_seq : this . lastSeq + 1 ,
636
606
noEmit : false ,
0 commit comments