Skip to content

Commit 92bed80

Browse files
Michael Barnesrkistner
andauthored
Updates to test-client concurrent-connections command (#359)
* Implemented the new print argument for the concurrent-connections test-client command * Readme updates * Fixed typos in the readme * Tweak default print behavior. * Rewrite concurrent-connections clients - now resumeable. --------- Co-authored-by: Ralf Kistner <[email protected]>
1 parent 7d495e7 commit 92bed80

File tree

13 files changed

+367
-186
lines changed

13 files changed

+367
-186
lines changed

pnpm-lock.yaml

Lines changed: 9 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

test-client/README.md

Lines changed: 50 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,34 @@ This is a minimal client demonstrating direct usage of the HTTP stream sync API.
44

55
For a full implementation, see our client SDKs.
66

7-
## Usage
7+
## Setup
88

9-
```sh
10-
# In project root
9+
1. Install dependencies on the project root
10+
11+
```shell
12+
# In project root directory
1113
pnpm install
1214
pnpm build:packages
13-
# In this folder
15+
```
16+
17+
2. Build the test-client in the `test-client` directory
18+
19+
```shell
20+
# In the test-client directory
1421
pnpm build
15-
node dist/bin.js fetch-operations --token <token> --endpoint http://localhost:8080
22+
```
1623

17-
# More examples:
24+
## Usage
25+
26+
### fetch-operations
27+
28+
The `fetch-operations` command downloads data for a single checkpoint, and outputs a normalized form: one CLEAR operation, followed by the latest PUT operation for each row. This normalized form is still split per bucket. The output is not affected by compacting, but can be affected by replication order.
29+
30+
To avoid normalizing the data, use the `--raw` option. This may include additional CLEAR, MOVE, REMOVE and duplicate PUT operations.
31+
32+
```sh
33+
# If the endpoint is not available in the token aud field, add the --endpoint argument
34+
node dist/bin.js fetch-operations --token <token> --endpoint http://localhost:8080
1835

1936
# If the endpoint is present in token aud field, it can be omitted from args:
2037
node dist/bin.js fetch-operations --token <token>
@@ -29,12 +46,35 @@ node dist/bin.js fetch-operations --config path/to/powersync.yaml
2946
node dist/bin.js fetch-operations --config path/to/powersync.yaml --sub test-user
3047
```
3148

32-
The `fetch-operations` command downloads data for a single checkpoint, and outputs a normalized form: one CLEAR operation, followed by the latest PUT operation for each row. This normalized form is still split per bucket. The output is not affected by compacting, but can be affected by replication order.
49+
### generate-token
3350

34-
To avoid normalizing the data, use the `--raw` option. This may include additional CLEAR, MOVE, REMOVE and duplicate PUT operations.
35-
36-
To generate a token without downloading data, use the `generate-token` command:
51+
Used to generate a JWT token based on your current PowerSync YAML config.
3752

3853
```sh
3954
node dist/bin.js generate-token --config path/to/powersync.yaml --sub test-user
4055
```
56+
57+
### concurrent-connections
58+
59+
Use this command to simulate concurrent connections to a PowerSync instance. This can be used for performance benchmarking
60+
and other load-testing use cases. There are two modes available, `websocket` or `http`. By default, the command uses the
61+
`http` mode.
62+
63+
```shell
64+
# Send two concurrent requests to request a download of sync operations using -n to specify the number of connections
65+
node ./dist/bin.js concurrent-connections -n 2 -t <token>
66+
67+
# Send two concurrent requests to request a download of sync operations using websocket mode
68+
node ./dist/bin.js concurrent-connections -n 2 -t <token> -m websocket
69+
```
70+
71+
Once the sync has completed for a connection the command will print the `op_id`, `ops`, `bytes` and `duration`
72+
each connection.
73+
74+
To see what rows are being synced, specify `--print=id`, or another relevant field. This will be included
75+
in the output as a `data:` array with each checkpoint.
76+
77+
```shell
78+
# Send two concurrent requests and print the name field, as an example.
79+
node ./dist/bin.js concurrent-connections -n 2 -t <token> -p name
80+
```

test-client/package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
"@powersync/service-core": "workspace:*",
1919
"commander": "^12.0.0",
2020
"jose": "^4.15.1",
21+
"undici": "^7.15.0",
2122
"ws": "^8.18.0",
2223
"yaml": "^2.5.0"
2324
},
@@ -26,4 +27,4 @@
2627
"@types/ws": "~8.2.0",
2728
"typescript": "^5.7.3"
2829
}
29-
}
30+
}

test-client/src/bin.ts

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,20 @@ program
4040
.option('-u, --sub [sub]', 'sub field for auto-generated token')
4141
.option('-n, --num-clients [num-clients]', 'number of clients to connect')
4242
.option('-m, --mode [mode]', 'http or websocket')
43+
.option('-p, --print [print]', 'print a field from the data being downloaded')
44+
.option('--once', 'stop after the first checkpoint')
4345
.action(async (options) => {
4446
const credentials = await getCredentials(options);
4547

46-
await concurrentConnections(credentials, options['numClients'] ?? 10, options.mode ?? 'http');
48+
await concurrentConnections(
49+
{
50+
...credentials,
51+
once: options.once ?? false,
52+
mode: options.mode
53+
},
54+
options['numClients'] ?? 10,
55+
options.print
56+
);
4757
});
4858

4959
await program.parseAsync();

test-client/src/client.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ export async function getCheckpointData(options: GetCheckpointOptions) {
2929
let data: types.StreamingSyncData[] = [];
3030
let checkpoint: types.StreamingSyncCheckpoint;
3131

32-
for await (let chunk of ndjsonStream<types.StreamingSyncLine>(response.body!)) {
32+
for await (let { chunk } of ndjsonStream<types.StreamingSyncLine>(response.body!)) {
3333
if (isStreamingSyncData(chunk)) {
3434
// Collect data
3535
data.push(chunk);

test-client/src/httpStream.ts

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
import type { StreamingSyncLine, StreamingSyncRequest } from '@powersync/service-core';
2+
import { Readable } from 'node:stream';
3+
import { request } from 'undici';
4+
import { ndjsonStream } from './ndjson.js';
5+
import { StreamEvent, SyncOptions } from './stream.js';
6+
7+
export async function* openHttpStream(options: SyncOptions): AsyncGenerator<StreamEvent> {
8+
const streamRequest: StreamingSyncRequest = {
9+
raw_data: true,
10+
client_id: options.clientId,
11+
buckets: [...(options.bucketPositions ?? new Map()).entries()].map(([bucket, after]) => ({
12+
name: bucket,
13+
after: after
14+
}))
15+
};
16+
const response = await request(options.endpoint + '/sync/stream', {
17+
method: 'POST',
18+
headers: {
19+
'Content-Type': 'application/json',
20+
Authorization: `Token ${options.token}`
21+
},
22+
body: JSON.stringify(streamRequest),
23+
signal: options.signal
24+
});
25+
26+
if (response.statusCode != 200) {
27+
throw new Error(`Request failed with code: ${response.statusCode}\n${await response.body.text()}`);
28+
}
29+
30+
const stream = Readable.toWeb(response.body) as ReadableStream<Uint8Array>;
31+
32+
for await (let { chunk, size } of ndjsonStream<StreamingSyncLine>(stream)) {
33+
yield { stats: { decodedBytes: size } };
34+
yield chunk;
35+
}
36+
// If we reach this, the connection was closed without error by the server
37+
}

test-client/src/load-testing/http-worker.ts

Lines changed: 0 additions & 59 deletions
This file was deleted.
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
import { parentPort, workerData } from 'worker_threads';
2+
import { openStream, SyncOptions } from '../stream.js';
3+
4+
if (parentPort == null) {
5+
throw new Error(`Can only run this script in a worker_thread`);
6+
}
7+
8+
const { i, print } = workerData;
9+
const request: SyncOptions = workerData.request;
10+
11+
let size = 0;
12+
let numOperations = 0;
13+
let lastCheckpointStart = 0;
14+
let printData: string[] = [];
15+
16+
const parseChunk = (chunk: any) => {
17+
if (print == null) {
18+
return;
19+
}
20+
chunk.data.forEach((data: any) => {
21+
if (data.op == 'PUT') {
22+
const payload = JSON.parse(data.data);
23+
printData.push(payload[print]);
24+
}
25+
});
26+
};
27+
28+
for await (let chunk of openStream(request)) {
29+
if ('error' in chunk) {
30+
// Retried automatically
31+
console.error(new Date().toISOString(), i, `Error in stream: ${chunk.error}`);
32+
} else if ('checkpoint_complete' in chunk) {
33+
const duration = performance.now() - lastCheckpointStart;
34+
let message = `checkpoint_complete op_id: ${chunk.checkpoint_complete.last_op_id}, ops: ${numOperations}, bytes: ${size}, duration: ${duration.toFixed(0)}ms`;
35+
if (print) {
36+
message += `, data: [${printData}]`;
37+
}
38+
printData = [];
39+
console.log(new Date().toISOString(), i, message);
40+
41+
if (request.once) {
42+
break;
43+
}
44+
} else if ('data' in chunk) {
45+
parseChunk(chunk.data);
46+
numOperations += chunk.data.data.length;
47+
} else if ('checkpoint' in chunk) {
48+
lastCheckpointStart = performance.now();
49+
console.log(new Date().toISOString(), i, `checkpoint buckets: ${chunk.checkpoint.buckets.length}`);
50+
} else if ('checkpoint_diff' in chunk) {
51+
lastCheckpointStart = performance.now();
52+
console.log(
53+
new Date().toISOString(),
54+
i,
55+
`checkpoint_diff removed_buckets: ${chunk.checkpoint_diff.removed_buckets.length} updated_buckets: ${chunk.checkpoint_diff.updated_buckets.length}`
56+
);
57+
} else if ('stats' in chunk) {
58+
size += chunk.stats.decodedBytes;
59+
} else {
60+
const key = Object.keys(chunk)[0];
61+
if (key != 'token_expires_in' && key != 'data') {
62+
console.log(new Date().toISOString(), i, key);
63+
}
64+
}
65+
}
Lines changed: 10 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,24 @@
11
import { Worker } from 'worker_threads';
2-
import { Credentials } from '../auth.js';
2+
import { SyncOptions } from '../stream.js';
33

44
export type Mode = 'http' | 'websocket';
55

6-
export async function stream(i: number, credentials: Credentials, mode: Mode) {
7-
const worker =
8-
mode == 'websocket'
9-
? new Worker(new URL('./rsocket-worker.js', import.meta.url), {
10-
workerData: { i, token: credentials.token, url: credentials.endpoint.replace(/^http/, 'ws') }
11-
})
12-
: new Worker(new URL('./http-worker.js', import.meta.url), {
13-
workerData: { i, token: credentials.token, url: credentials.endpoint }
14-
});
6+
export async function stream(i: number, request: SyncOptions, print: string | undefined) {
7+
const worker = new Worker(new URL('./load-test-worker.js', import.meta.url), {
8+
workerData: { i, request, print }
9+
});
1510
await new Promise((resolve, reject) => {
1611
worker.on('message', (event) => resolve(event));
1712
worker.on('error', (err) => reject(err));
13+
worker.on('exit', (__code) => {
14+
resolve(null);
15+
});
1816
});
1917
worker.terminate();
2018
}
2119

22-
export async function streamForever(i: number, credentials: Credentials, mode: Mode) {
23-
while (true) {
24-
try {
25-
await stream(i, credentials, mode);
26-
console.log(new Date().toISOString(), i, 'Stream ended');
27-
} catch (e) {
28-
console.error(new Date().toISOString(), i, e.message);
29-
await new Promise((resolve) => setTimeout(resolve, 1000 + Math.random()));
30-
}
31-
}
32-
}
33-
34-
export async function concurrentConnections(credentials: Credentials, numClients: number, mode: Mode) {
20+
export async function concurrentConnections(options: SyncOptions, numClients: number, print: string | undefined) {
3521
for (let i = 0; i < numClients; i++) {
36-
streamForever(i, credentials, mode);
22+
stream(i, { ...options, clientId: options.clientId ?? `load-test-${i}` }, print);
3723
}
3824
}

0 commit comments

Comments
 (0)