Skip to content

Commit b5949ca

Browse files
committed
feat: bridge/messagebus interop
1 parent 22cd36f commit b5949ca

File tree

1 file changed

+58
-2
lines changed

1 file changed

+58
-2
lines changed

src/MessageBus.js

Lines changed: 58 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ function getGlobalThis() {
1414
}
1515

1616
/**
17-
* @returns {{ ee: EventEmitter; sink: Sink; }}
17+
* @returns {{ ee: EventEmitter; sink: Sink; bridge?: import("@podium/bridge").PodiumBridge }}
1818
*/
1919
function getGlobalObjects() {
2020
let objs = getGlobalThis()['@podium'];
@@ -38,9 +38,10 @@ function getGlobalObjects() {
3838

3939
export default class MessageBus {
4040
constructor() {
41-
const { ee, sink } = getGlobalObjects();
41+
const { ee, sink, bridge } = getGlobalObjects();
4242
this.ee = ee;
4343
this.sink = sink;
44+
this.bridge = bridge;
4445
}
4546

4647
/**
@@ -81,9 +82,35 @@ export default class MessageBus {
8182
const event = new Event(channel, topic, payload);
8283
this.ee.emit(event.toKey(), event);
8384
this.sink.push(event);
85+
if (this.bridge) {
86+
/** @type {T | T[]} */
87+
let params = payload;
88+
89+
if (typeof payload !== 'undefined') {
90+
// JSON RPC 2.0 requires that params is either an object or an array. Wrap primitives in an an array.
91+
const isPrimitive =
92+
typeof params === 'string' ||
93+
typeof params === 'boolean' ||
94+
typeof params === 'number';
95+
if (isPrimitive) {
96+
params = [payload];
97+
}
98+
}
99+
100+
this.bridge.notification({
101+
method: `${channel}/${topic}`,
102+
params,
103+
});
104+
}
84105
return event;
85106
}
86107

108+
/**
109+
* Saves a reference to the event handlers that wrap the API for @podium/bridge, so we can unsubscribe later.
110+
* @type {Map<MessageHandler<any>, import('@podium/bridge').EventHandler<any>>}
111+
*/
112+
#bridgeMap = new Map();
113+
87114
/**
88115
* Subscribe to messages for a channel and topic.
89116
*
@@ -101,6 +128,28 @@ export default class MessageBus {
101128
* ```
102129
*/
103130
subscribe(channel, topic, listener) {
131+
if (this.bridge) {
132+
// If there's a bridge, add a listener for the channel and topic there
133+
// and translate incoming messages to a @podium/browser Event for the
134+
// same API surface in userland.
135+
136+
/** @type {import('@podium/bridge').EventHandler<T>} */
137+
const bridgeListener = (message) => {
138+
const request =
139+
/** @type {import("@podium/bridge").RpcRequest<T>} */ (
140+
message
141+
);
142+
143+
const event = new Event(channel, topic, request.params);
144+
this.sink.push(event);
145+
listener(event);
146+
};
147+
this.bridge.on(`${channel}/${topic}`, bridgeListener);
148+
149+
// Save a reference to the bridgeListener so we can unsubscribe later.
150+
this.#bridgeMap.set(listener, bridgeListener);
151+
}
152+
104153
this.ee.on(toKey(channel, topic), listener);
105154
}
106155

@@ -126,5 +175,12 @@ export default class MessageBus {
126175
*/
127176
unsubscribe(channel, topic, listener) {
128177
this.ee.off(toKey(channel, topic), listener);
178+
179+
if (this.bridge) {
180+
const bridgeListener = this.#bridgeMap.get(listener);
181+
if (bridgeListener) {
182+
this.bridge.off(`${channel}/${topic}`, bridgeListener);
183+
}
184+
}
129185
}
130186
}

0 commit comments

Comments
 (0)