Skip to content

Commit 4a4098a

Browse files
nicoschmdtpatrickelectric
authored andcommitted
frontend: zenoh: add query and subscriber handlers
1 parent 4a91270 commit 4a4098a

1 file changed

Lines changed: 56 additions & 1 deletion

File tree

core/frontend/src/libs/zenoh/index.ts

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { Config, Session } from '@eclipse-zenoh/zenoh-ts'
1+
import { Config, QueryTarget, Receiver, RecvErr, Sample, Session, Subscriber } from '@eclipse-zenoh/zenoh-ts'
22

33
class ZenohManager {
44
private static instance: ZenohManager
@@ -30,6 +30,61 @@ class ZenohManager {
3030
}
3131
return this.sessionPromise
3232
}
33+
34+
public async query(key: string, target: QueryTarget, timeout: number = 30000) : Promise<any | null> {
35+
const session = await this.getSession()
36+
if (!session) {
37+
console.error('Zenoh session not initialized')
38+
return null
39+
}
40+
41+
const receiver: Receiver | void = session.get(key, {
42+
target,
43+
})
44+
45+
if (!(receiver instanceof Receiver)) {
46+
console.error('Failed to create query receiver. No queryable found or connection error.')
47+
return null
48+
}
49+
50+
const timeoutPromise = new Promise<null>((resolve) => {
51+
setTimeout(() => resolve(null), timeout)
52+
})
53+
54+
const replyPromise = receiver.receive()
55+
const reply = await Promise.race([replyPromise, timeoutPromise])
56+
57+
if (reply === null || reply === RecvErr.Disconnected) {
58+
console.error('Query timeout: No response from zenoh queryable. '
59+
+ 'The service may be unavailable or the extension may not exist.')
60+
return null
61+
}
62+
63+
if (!reply || typeof (reply as any).result !== 'function') {
64+
console.error('Unexpected reply from zenoh queryable:', reply)
65+
return null
66+
}
67+
68+
const payload = (reply as { result: () => Sample }).result()
69+
try {
70+
return JSON.parse(payload.payload().to_string())
71+
} catch (error) {
72+
console.error('Error parsing response:', error)
73+
return null
74+
}
75+
}
76+
77+
public async subscriber(topic: string, handler: (sample: Sample) => Promise<void>) : Promise<Subscriber | null> {
78+
const session = await this.getSession()
79+
if (!session) {
80+
console.error('Zenoh session not initialized')
81+
return null
82+
}
83+
84+
return session.declare_subscriber(topic, {
85+
handler,
86+
})
87+
}
3388
}
3489

3590
const zenoh = ZenohManager.getInstance()

0 commit comments

Comments
 (0)