Skip to content

Commit c533cf9

Browse files
committed
Merge remote-tracking branch 'origin/master' into conat-optimize1
2 parents bebf04c + 978125e commit c533cf9

File tree

6 files changed

+89
-17
lines changed

6 files changed

+89
-17
lines changed

src/packages/conat/core/client.ts

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1450,14 +1450,13 @@ export class Client extends EventEmitter {
14501450
sync = {
14511451
dkv: async <T,>(opts: DKVOptions): Promise<DKV<T>> =>
14521452
await dkv<T>({ ...opts, client: this }),
1453-
akv: async <T,>(opts: DKVOptions): Promise<AKV<T>> =>
1454-
await akv<T>({ ...opts, client: this }),
1453+
akv: <T,>(opts: DKVOptions): AKV<T> => akv<T>({ ...opts, client: this }),
14551454
dko: async <T,>(opts: DKVOptions): Promise<DKO<T>> =>
14561455
await dko<T>({ ...opts, client: this }),
14571456
dstream: async <T,>(opts: DStreamOptions): Promise<DStream<T>> =>
14581457
await dstream<T>({ ...opts, client: this }),
1459-
astream: async <T,>(opts: DStreamOptions): Promise<AStream<T>> =>
1460-
await astream<T>({ ...opts, client: this }),
1458+
astream: <T,>(opts: DStreamOptions): AStream<T> =>
1459+
astream<T>({ ...opts, client: this }),
14611460
synctable: async (opts: SyncTableOptions): Promise<ConatSyncTable> =>
14621461
await createSyncTable({ ...opts, client: this }),
14631462
};

src/packages/conat/hub/api/sync.ts

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,43 @@
11
import { authFirst } from "./util";
22

3+
export interface Patch {
4+
seq: number;
5+
time: number;
6+
mesg: {
7+
time: number;
8+
wall: number;
9+
patch: string;
10+
user_id: number;
11+
is_snapshot?: boolean;
12+
parents: number[];
13+
version?: number;
14+
};
15+
}
16+
17+
export interface HistoryInfo {
18+
doctype: string;
19+
init: { time: Date; size: number; error: string };
20+
last_active: Date;
21+
path: string;
22+
project_id: string;
23+
read_only: boolean;
24+
save: {
25+
state: string;
26+
error: string;
27+
hash: number;
28+
time: number;
29+
expected_hash: number;
30+
};
31+
string_id: string;
32+
users: string[];
33+
}
34+
335
export interface Sync {
436
history: (opts: {
537
account_id?: string;
638
project_id: string;
739
path: string;
8-
}) => Promise<any[]>;
40+
}) => Promise<{ patches: Patch[] }>;
941
}
1042

1143
export const sync = {

src/packages/conat/hub/changefeeds/util.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,18 +6,18 @@ export const SUBJECT = "changefeeds.*";
66
// If the user refreshes their browser, it is still about a minute
77
// before all the changefeeds they had open are free (due to the
88
// SERVER_KEEPALIVE time below).
9-
export const MAX_PER_ACCOUNT = 500;
10-
export const MAX_GLOBAL = 10000;
9+
export const MAX_PER_ACCOUNT = 1_000;
10+
export const MAX_GLOBAL = 50_000;
1111

1212
const DEBUG_DEVEL_MODE = false;
1313

14-
export let CLIENT_KEEPALIVE = 90000;
15-
export let SERVER_KEEPALIVE = 45000;
16-
export let KEEPALIVE_TIMEOUT = 10000;
14+
export let CLIENT_KEEPALIVE = 90_000;
15+
export let SERVER_KEEPALIVE = 45_000;
16+
export let KEEPALIVE_TIMEOUT = 15_000;
1717

1818
if (DEBUG_DEVEL_MODE) {
1919
console.log(
20-
"*** WARNING: Using DEBUB_DEVEL_MODE changefeed parameters!! ***",
20+
"*** WARNING: Using DEBUG_DEVEL_MODE changefeed parameters!! ***",
2121
);
2222
CLIENT_KEEPALIVE = 6000;
2323
SERVER_KEEPALIVE = 3000;

src/packages/conat/sync/akv.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,15 @@ export class AKV<T = any> {
141141
});
142142
};
143143

144+
// WARNING/TODO: this getAll implementation is not at all clever!
145+
// getAll = async () => {
146+
// const v: { [key: string]: T } = {};
147+
// for (const key of await this.keys()) {
148+
// v[key] = await this.get(key);
149+
// }
150+
// return v;
151+
// };
152+
144153
sqlite = async (
145154
statement: string,
146155
params?: any[],

src/packages/conat/sync/astream.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ export class AStream<T = any> {
9999
// or network is flaky, but will return all data properly in order
100100
// then throw an exception with code 503 rather than returning data
101101
// with something skipped.
102-
async *getAll(opts): AsyncGenerator<
102+
async *getAll(opts?): AsyncGenerator<
103103
{
104104
mesg: T;
105105
headers?: Headers;
Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,52 @@
1-
//import { conat } from "@cocalc/backend/conat";
1+
import { conat } from "@cocalc/backend/conat";
22
import isCollaborator from "@cocalc/server/projects/is-collaborator";
3+
import { patchesStreamName } from "@cocalc/conat/sync/synctable-stream";
4+
import { type Patch, type HistoryInfo } from "@cocalc/conat/hub/api/sync";
5+
import { client_db } from "@cocalc/util/db-schema/client-db";
36

47
export async function history({
58
account_id,
69
project_id,
710
path,
11+
start_seq = 0,
12+
end_seq,
813
}: {
914
account_id?: string;
1015
project_id: string;
1116
path: string;
12-
}): Promise<any[]> {
17+
start_seq?: number;
18+
end_seq?: number;
19+
}): Promise<{ patches: Patch[]; info: HistoryInfo }> {
1320
if (!account_id || !(await isCollaborator({ account_id, project_id }))) {
1421
throw Error("user must be collaborator on source project");
1522
}
1623

17-
// const client = conat();
18-
// this will be much easier once the fs2 branch is merged
19-
throw Error(`not implemented yet -- can't get history of ${path} yet`);
24+
const client = conat();
25+
const name = patchesStreamName({ project_id, path });
26+
const astream = client.sync.astream({
27+
name,
28+
project_id,
29+
noInventory: true,
30+
});
31+
const patches: Patch[] = [];
32+
for await (const patch of await astream.getAll({
33+
start_seq,
34+
end_seq,
35+
})) {
36+
patches.push(patch as any);
37+
}
38+
39+
const akv = client.sync.akv({
40+
name: `__dko__syncstrings:${client_db.sha1(project_id, path)}`,
41+
project_id,
42+
noInventory: true,
43+
});
44+
const keys = await akv.keys();
45+
const info: Partial<HistoryInfo> = {};
46+
for (const key of keys) {
47+
if (key[0] != "[") continue;
48+
info[JSON.parse(key)[1]] = await akv.get(key);
49+
}
50+
51+
return { patches, info: info as HistoryInfo };
2052
}

0 commit comments

Comments
 (0)