pubsub / live data pattern #29
-
|
Hey, Curious what it looks like using this tool to setup a 'live data' type connection. Its mentioned that "Supports bidirectional calling. The client can call the server, and the server can also call the client." I figured this would be a simple implementation given the websocket implementation but I wasn't able to get it hooked up / not sure its possible without polling? Massive likelyhood I've not wired it up correctly. Added the full reproduction example for reference but here is the gist of it: Serverclass LiveCounter extends RpcTarget implements CounterServer {
private count = 0;
private clients: CounterClient[] = [];
async get() {
return this.count;
}
async increment() {
this.count++;
console.log('[SERVER] Increment', this.count);
this.notify();
}
async notify() {
this.clients.forEach((client) => {
try {
console.log('[SERVER] Notifying client', this.count);
client.onUpdate(this.count);
} catch (err) {
console.error('[SERVER] Error notifying client', err);
}
});
}
async subscribe(client: CounterClient) {
this.clients.push(client);
}
}Clientimport { newWebSocketRpcSession, RpcTarget } from 'capnweb';
import { CounterClient, CounterServer } from './types.js';
class CounterClientImpl extends RpcTarget implements CounterClient {
private currentValue = 0;
async onUpdate(newVal: number): Promise<void> {
this.currentValue = newVal;
console.log(`[CLIENT] updated: ${newVal}`);
}
async getCurrentValue(): Promise<number> {
return this.currentValue;
}
}
const client = new CounterClientImpl();
async function main() {
console.log("[CLIENT] Connecting to Cap'n Web counter server...");
try {
// Connect to the WebSocket server
const api = newWebSocketRpcSession<CounterServer>('ws://localhost:8080');
await api.subscribe(client);
await api.increment();
await api.increment();
await api.increment();
const valueFromServer = await api.get();
console.log(`[CLIENT] re-fetch: ${valueFromServer}`);
const currentValue = await client.getCurrentValue();
console.log(`[CLIENT] cached: ${currentValue}`);
} catch (error) {
console.error('Error:', error);
process.exit(1);
}
}
main().catch(console.error);Typesexport interface CounterClient {
onUpdate(newVal: number): void;
}
export interface CounterServer {
get(): Promise<number>;
increment(): Promise<void>;
subscribe(client: CounterClient): void;
notify(): Promise<void>;
}I understand if this is outside the scope of this dependancy, thanks really liking it so far 🙂 |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 5 replies
-
|
I think there's a bug here: subscribe(client: CounterClient) {
client.onUpdate(this.count);
}Don't you need to do BTW you should also probably use |
Beta Was this translation helpful? Give feedback.
Ah, another problem is that any stubs passed as arguments to your RPC method will be disposed as soon as it returns. To keep copies of them long term, you need to call
.dup()on them.This is covered in the readme.
Also, if you were properly catching exceptions from the callback and reporting them, it would have told you the problem is that it is disposed. However in order to catch exceptions from the callback, you have to
awaitit. Currently it looks like you have it in a try/catch but you did notawaitit, so the try/catch ends before the callback actually throws.(This would also normally show up as an "unhandled rejection", which you could be monito…