@@ -21,7 +21,6 @@ import {
21
21
import { SyncDoc } from "./sync-doc" ;
22
22
import { SyncTable } from "@cocalc/sync/table/synctable" ;
23
23
import { Client } from "./types" ;
24
- import { delay } from "awaiting" ;
25
24
import { debounce } from "lodash" ;
26
25
import sha1 from "sha1" ;
27
26
@@ -37,7 +36,6 @@ type Value = { [key: string]: any };
37
36
// Also, we don't implement complete object delete yet so instead we
38
37
// set the data field to null, which clears all state about and
39
38
// object and makes it easy to know to ignore it.
40
- // We also use this time for deleting ephemeral messages.
41
39
const GC_DEBOUNCE_MS = 10000 ;
42
40
43
41
// If for some reason GC needs to be deleted, e.g., maybe you
@@ -48,6 +46,9 @@ const GC_DEBOUNCE_MS = 10000;
48
46
// collection.
49
47
const DISABLE_GC = false ;
50
48
49
+ // ignore messages past this age.
50
+ const MAX_MESSAGE_TIME_MS = 10000 ;
51
+
51
52
interface CommMessage {
52
53
header : { msg_id : string } ;
53
54
parent_header : { msg_id : string } ;
@@ -74,7 +75,9 @@ export class IpywidgetsState extends EventEmitter {
74
75
// This should be done in conjunction with the main table (with gc
75
76
// on backend, and with change to null event on the frontend).
76
77
private buffers : {
77
- [ model_id : string ] : { [ path : string ] : { buffer : Buffer ; hash : string } } ;
78
+ [ model_id : string ] : {
79
+ [ path : string ] : { buffer : Buffer ; hash : string } ;
80
+ } ;
78
81
} = { } ;
79
82
// Similar but used on frontend
80
83
private arrayBuffers : {
@@ -227,7 +230,7 @@ export class IpywidgetsState extends EventEmitter {
227
230
buffers, which is a problem I don't think upstream ipywidgets
228
231
has to solve.
229
232
*/
230
- get_model_buffers = async (
233
+ getModelBuffers = async (
231
234
model_id : string ,
232
235
) : Promise < {
233
236
buffer_paths : string [ ] [ ] ;
@@ -258,19 +261,8 @@ export class IpywidgetsState extends EventEmitter {
258
261
buffers . push ( cur . buffer ) ;
259
262
return ;
260
263
}
261
- // async get of the buffer efficiently via HTTP:
262
- if ( this . client . ipywidgetsGetBuffer == null ) {
263
- throw Error (
264
- "NotImplementedError: frontend client must implement ipywidgetsGetBuffer in order to support binary buffers" ,
265
- ) ;
266
- }
267
264
try {
268
- const buffer = await this . client . ipywidgetsGetBuffer (
269
- this . syncdoc . project_id ,
270
- auxFileToOriginal ( this . syncdoc . path ) ,
271
- model_id ,
272
- path ,
273
- ) ;
265
+ const buffer = await this . clientGetBuffer ( model_id , path ) ;
274
266
this . arrayBuffers [ model_id ] [ path ] = { buffer, hash } ;
275
267
buffer_paths . push ( JSON . parse ( path ) ) ;
276
268
buffers . push ( buffer ) ;
@@ -279,40 +271,81 @@ export class IpywidgetsState extends EventEmitter {
279
271
}
280
272
} ;
281
273
// Run f in parallel on all of the keys of value:
282
- await Promise . all ( value . keySeq ( ) . toJS ( ) . map ( f ) ) ;
274
+ await Promise . all (
275
+ value
276
+ . keySeq ( )
277
+ . toJS ( )
278
+ . filter ( ( path ) => path . startsWith ( "[" ) )
279
+ . map ( f ) ,
280
+ ) ;
283
281
return { buffers, buffer_paths } ;
284
282
} ;
285
283
284
+ private clientGetBuffer = async ( model_id : string , path : string ) => {
285
+ // async get of the buffer efficiently via HTTP:
286
+ if ( this . client . ipywidgetsGetBuffer == null ) {
287
+ throw Error (
288
+ "NotImplementedError: frontend client must implement ipywidgetsGetBuffer in order to support binary buffers" ,
289
+ ) ;
290
+ }
291
+ return await this . client . ipywidgetsGetBuffer (
292
+ this . syncdoc . project_id ,
293
+ auxFileToOriginal ( this . syncdoc . path ) ,
294
+ model_id ,
295
+ path ,
296
+ ) ;
297
+ } ;
298
+
286
299
// Used on the backend by the project http server
287
- getBuffer = ( model_id : string , buffer_path : string ) : Buffer | undefined => {
300
+ getBuffer = (
301
+ model_id : string ,
302
+ buffer_path_or_sha1 : string ,
303
+ ) : Buffer | undefined => {
288
304
const dbg = this . dbg ( "getBuffer" ) ;
289
- dbg ( "getBuffer" , model_id , buffer_path ) ;
290
- return this . buffers [ model_id ] ?. [ buffer_path ] ?. buffer ;
305
+ dbg ( "getBuffer" , model_id , buffer_path_or_sha1 ) ;
306
+ return this . buffers [ model_id ] ?. [ buffer_path_or_sha1 ] ?. buffer ;
291
307
} ;
292
308
309
+ // returns the sha1 hashes of the buffers
293
310
private set_model_buffers = (
311
+ // model that buffers are associated to:
294
312
model_id : string ,
295
- buffer_paths : string [ ] [ ] ,
313
+ // if given, these are buffers with given paths; if not given, we
314
+ // store buffer associated to sha1 (which is used for custom messages)
315
+ buffer_paths : string [ ] [ ] | undefined ,
316
+ // the actual buffers.
296
317
buffers : Buffer [ ] ,
297
318
fire_change_event : boolean = true ,
298
- ) : void => {
319
+ ) : string [ ] => {
299
320
const dbg = this . dbg ( "set_model_buffers" ) ;
300
321
dbg ( "buffer_paths = " , buffer_paths ) ;
301
322
302
323
const data : { [ path : string ] : boolean } = { } ;
303
324
if ( this . buffers [ model_id ] == null ) {
304
325
this . buffers [ model_id ] = { } ;
305
326
}
306
- for ( let i = 0 ; i < buffer_paths . length ; i ++ ) {
307
- const key = JSON . stringify ( buffer_paths [ i ] ) ;
308
- // we set to the sha1 of the buffer not just to make getting
309
- // the buffer easy, but to make it easy to KNOW if we
310
- // even need to get the buffer.
311
- const hash = sha1 ( buffers [ i ] ) ;
312
- data [ key ] = hash ;
313
- this . buffers [ model_id ] [ key ] = { buffer : buffers [ i ] , hash } ;
327
+ const hashes : string [ ] = [ ] ;
328
+ if ( buffer_paths != null ) {
329
+ for ( let i = 0 ; i < buffer_paths . length ; i ++ ) {
330
+ const key = JSON . stringify ( buffer_paths [ i ] ) ;
331
+ // we set to the sha1 of the buffer not just to make getting
332
+ // the buffer easy, but to make it easy to KNOW if we
333
+ // even need to get the buffer.
334
+ const hash = sha1 ( buffers [ i ] ) ;
335
+ hashes . push ( hash ) ;
336
+ data [ key ] = hash ;
337
+ this . buffers [ model_id ] [ key ] = { buffer : buffers [ i ] , hash } ;
338
+ }
339
+ } else {
340
+ for ( const buffer of buffers ) {
341
+ const hash = sha1 ( buffer ) ;
342
+ hashes . push ( hash ) ;
343
+ this . buffers [ model_id ] [ hash ] = { buffer, hash } ;
344
+ data [ hash ] = hash ;
345
+ }
314
346
}
315
347
this . set ( model_id , "buffers" , data , fire_change_event ) ;
348
+ return hashes ;
316
349
} ;
317
350
318
351
/*
@@ -477,6 +510,8 @@ scat.x, scat.y = np.random.rand(2, 50)
477
510
const [ string_id , model_id , type ] = JSON . parse ( key ) ;
478
511
if ( ! activeIds . has ( model_id ) ) {
479
512
// Delete this model from the table (or as close to delete as we have).
513
+ // This removes the last message, state, buffer info, and value,
514
+ // depending on type.
480
515
this . table . set (
481
516
{ string_id, type, model_id, data : null } ,
482
517
"none" ,
@@ -709,16 +744,25 @@ scat.x, scat.y = np.random.rand(2, 50)
709
744
message,
710
745
buffers : `${ buffers ?. length ?? "no" } buffers` ,
711
746
} ) ;
747
+ let buffer_hashes : string [ ] ;
712
748
if (
713
749
buffers != null &&
714
750
buffers . length > 0 &&
715
751
content . data . buffer_paths == null
716
752
) {
717
753
// TODO
718
- dbg ( "custom message -- there are BUFFERS -- NOT implemented!!" ) ;
754
+ dbg ( "custom message -- there are BUFFERS -- saving them" ) ;
755
+ buffer_hashes = this . set_model_buffers (
756
+ model_id ,
757
+ undefined ,
758
+ buffers ,
759
+ false ,
760
+ ) ;
761
+ } else {
762
+ buffer_hashes = [ ] ;
719
763
}
720
764
// We now send the message.
721
- this . sendCustomMessage ( model_id , message , false ) ;
765
+ this . sendCustomMessage ( model_id , message , buffer_hashes , false ) ;
722
766
break ;
723
767
724
768
case "echo_update" :
@@ -900,32 +944,49 @@ with out:
900
944
private sendCustomMessage = async (
901
945
model_id : string ,
902
946
message : object ,
947
+ buffer_hashes : string [ ] ,
903
948
fire_change_event : boolean = true ,
904
949
) : Promise < void > => {
905
950
/*
906
951
Send a custom message.
907
952
908
953
It's not at all clear what this should even mean in the context of
909
954
realtime collaboration, and there will likely be clients where
910
- this is bad. But for now, we just make the message available
911
- via the table for a few seconds, then remove it. Any clients
912
- that are connected while we do this can react, and any that aren't
913
- just don't get the message (which is presumably fine).
955
+ this is bad. But for now, we just make the last message sent
956
+ available via the table, and each successive message overwrites the previous
957
+ one. Any clients that are connected while we do this can react,
958
+ and any that aren't just don't get the message (which is presumably fine).
914
959
915
960
Some widgets like ipympl use this to initialize state, so when a new
916
961
client connects, it requests a message describing the plot, and everybody
917
962
receives it.
918
963
*/
919
964
920
- this . set ( model_id , "message" , message , fire_change_event ) ;
921
- await delay ( GC_DEBOUNCE_MS ) ;
922
- // Actually, delete is not implemented for synctable, so for
923
- // now we just set it to an empty message.
924
- this . set ( model_id , "message" , { } , fire_change_event ) ;
965
+ this . set (
966
+ model_id ,
967
+ "message" ,
968
+ { message, buffer_hashes, time : Date . now ( ) } ,
969
+ fire_change_event ,
970
+ ) ;
925
971
} ;
926
972
927
- get_message = ( model_id : string ) => {
928
- return this . get ( model_id , "message" ) ?. toJS ( ) ;
973
+ // Return the most recent message for the given model.
974
+ getMessage = async (
975
+ model_id : string ,
976
+ ) : Promise < { message : object ; buffers : ArrayBuffer [ ] } | undefined > => {
977
+ const x = this . get ( model_id , "message" ) ?. toJS ( ) ;
978
+ if ( x == null ) {
979
+ return undefined ;
980
+ }
981
+ if ( Date . now ( ) - ( x . time ?? 0 ) >= MAX_MESSAGE_TIME_MS ) {
982
+ return undefined ;
983
+ }
984
+ const { message, buffer_hashes } = x ;
985
+ let buffers : ArrayBuffer [ ] = [ ] ;
986
+ for ( const hash of buffer_hashes ) {
987
+ buffers . push ( await this . clientGetBuffer ( model_id , hash ) ) ;
988
+ }
989
+ return { message, buffers } ;
929
990
} ;
930
991
}
931
992
0 commit comments