Skip to content

Commit be9b913

Browse files
committed
relay: get rid of the message queue, because js is single-threaded.
1 parent c2423f7 commit be9b913

File tree

3 files changed

+103
-213
lines changed

3 files changed

+103
-213
lines changed

abstract-relay.ts

Lines changed: 103 additions & 122 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,8 @@
33
import type { Event, EventTemplate, VerifiedEvent, Nostr, NostrEvent } from './core.ts'
44
import { matchFilters, type Filter } from './filter.ts'
55
import { getHex64, getSubscriptionId } from './fakejson.ts'
6-
import { Queue, normalizeURL } from './utils.ts'
6+
import { normalizeURL } from './utils.ts'
77
import { makeAuthEvent } from './nip42.ts'
8-
import { yieldThread } from './helpers.ts'
98

109
type RelayWebSocket = WebSocket & {
1110
ping?(): void
@@ -51,8 +50,6 @@ export class AbstractRelay {
5150
private openCountRequests = new Map<string, CountResolver>()
5251
private openEventPublishes = new Map<string, EventPublishResolver>()
5352
private ws: RelayWebSocket | undefined
54-
private incomingMessageQueue = new Queue<string>()
55-
private queueRunning = false
5653
private challenge: string | undefined
5754
private authPromise: Promise<string> | undefined
5855
private serial: number = 0
@@ -269,19 +266,110 @@ export class AbstractRelay {
269266
}
270267
}
271268

272-
private async runQueue() {
273-
this.queueRunning = true
274-
while (true) {
275-
if (false === this.handleNext()) {
276-
break
269+
public async send(message: string) {
270+
if (!this.connectionPromise) throw new SendingOnClosedConnection(message, this.url)
271+
272+
this.connectionPromise.then(() => {
273+
this.ws?.send(message)
274+
})
275+
}
276+
277+
public async auth(signAuthEvent: (evt: EventTemplate) => Promise<VerifiedEvent>): Promise<string> {
278+
const challenge = this.challenge
279+
if (!challenge) throw new Error("can't perform auth, no challenge was received")
280+
if (this.authPromise) return this.authPromise
281+
282+
this.authPromise = new Promise<string>(async (resolve, reject) => {
283+
try {
284+
let evt = await signAuthEvent(makeAuthEvent(this.url, challenge))
285+
let timeout = setTimeout(() => {
286+
let ep = this.openEventPublishes.get(evt.id) as EventPublishResolver
287+
if (ep) {
288+
ep.reject(new Error('auth timed out'))
289+
this.openEventPublishes.delete(evt.id)
290+
}
291+
}, this.publishTimeout)
292+
this.openEventPublishes.set(evt.id, { resolve, reject, timeout })
293+
this.send('["AUTH",' + JSON.stringify(evt) + ']')
294+
} catch (err) {
295+
console.warn('subscribe auth function failed:', err)
277296
}
278-
await yieldThread()
297+
})
298+
return this.authPromise
299+
}
300+
301+
public async publish(event: Event): Promise<string> {
302+
const ret = new Promise<string>((resolve, reject) => {
303+
const timeout = setTimeout(() => {
304+
const ep = this.openEventPublishes.get(event.id) as EventPublishResolver
305+
if (ep) {
306+
ep.reject(new Error('publish timed out'))
307+
this.openEventPublishes.delete(event.id)
308+
}
309+
}, this.publishTimeout)
310+
this.openEventPublishes.set(event.id, { resolve, reject, timeout })
311+
})
312+
this.send('["EVENT",' + JSON.stringify(event) + ']')
313+
return ret
314+
}
315+
316+
public async count(filters: Filter[], params: { id?: string | null }): Promise<number> {
317+
this.serial++
318+
const id = params?.id || 'count:' + this.serial
319+
const ret = new Promise<number>((resolve, reject) => {
320+
this.openCountRequests.set(id, { resolve, reject })
321+
})
322+
this.send('["COUNT","' + id + '",' + JSON.stringify(filters).substring(1))
323+
return ret
324+
}
325+
326+
public subscribe(
327+
filters: Filter[],
328+
params: Partial<SubscriptionParams> & { label?: string; id?: string },
329+
): Subscription {
330+
const sub = this.prepareSubscription(filters, params)
331+
sub.fire()
332+
333+
if (params.abort) {
334+
params.abort.onabort = () => sub.close(String(params.abort!.reason || '<aborted>'))
279335
}
280-
this.queueRunning = false
336+
337+
return sub
338+
}
339+
340+
public prepareSubscription(
341+
filters: Filter[],
342+
params: Partial<SubscriptionParams> & { label?: string; id?: string },
343+
): Subscription {
344+
this.serial++
345+
const id = params.id || (params.label ? params.label + ':' : 'sub:') + this.serial
346+
const subscription = new Subscription(this, id, filters, params)
347+
this.openSubs.set(id, subscription)
348+
return subscription
281349
}
282350

283-
private handleNext(): undefined | false {
284-
const json = this.incomingMessageQueue.dequeue()
351+
public close() {
352+
this.skipReconnection = true
353+
if (this.reconnectTimeoutHandle) {
354+
clearTimeout(this.reconnectTimeoutHandle)
355+
this.reconnectTimeoutHandle = undefined
356+
}
357+
if (this.pingIntervalHandle) {
358+
clearInterval(this.pingIntervalHandle)
359+
this.pingIntervalHandle = undefined
360+
}
361+
this.closeAllSubscriptions('relay connection closed by us')
362+
this._connected = false
363+
this.onclose?.()
364+
if (this.ws?.readyState === this._WebSocket.OPEN) {
365+
this.ws?.close()
366+
}
367+
}
368+
369+
// this is the function assigned to this.ws.onmessage
370+
// it's exposed for testing and debugging purposes
371+
public _onmessage(ev: MessageEvent<any>) {
372+
const json = ev.data
285373
if (!json) {
286374
return false
287375
}
@@ -381,118 +469,11 @@ export class AbstractRelay {
381469
}
382470
}
383471
} catch (err) {
472+
const [_, __, event] = JSON.parse(json)
473+
;(window as any).printer.maybe(event.pubkey, ':: caught err', event, this.url, err)
384474
return
385475
}
386476
}
387-
388-
public async send(message: string) {
389-
if (!this.connectionPromise) throw new SendingOnClosedConnection(message, this.url)
390-
391-
this.connectionPromise.then(() => {
392-
this.ws?.send(message)
393-
})
394-
}
395-
396-
public async auth(signAuthEvent: (evt: EventTemplate) => Promise<VerifiedEvent>): Promise<string> {
397-
const challenge = this.challenge
398-
if (!challenge) throw new Error("can't perform auth, no challenge was received")
399-
if (this.authPromise) return this.authPromise
400-
401-
this.authPromise = new Promise<string>(async (resolve, reject) => {
402-
try {
403-
let evt = await signAuthEvent(makeAuthEvent(this.url, challenge))
404-
let timeout = setTimeout(() => {
405-
let ep = this.openEventPublishes.get(evt.id) as EventPublishResolver
406-
if (ep) {
407-
ep.reject(new Error('auth timed out'))
408-
this.openEventPublishes.delete(evt.id)
409-
}
410-
}, this.publishTimeout)
411-
this.openEventPublishes.set(evt.id, { resolve, reject, timeout })
412-
this.send('["AUTH",' + JSON.stringify(evt) + ']')
413-
} catch (err) {
414-
console.warn('subscribe auth function failed:', err)
415-
}
416-
})
417-
return this.authPromise
418-
}
419-
420-
public async publish(event: Event): Promise<string> {
421-
const ret = new Promise<string>((resolve, reject) => {
422-
const timeout = setTimeout(() => {
423-
const ep = this.openEventPublishes.get(event.id) as EventPublishResolver
424-
if (ep) {
425-
ep.reject(new Error('publish timed out'))
426-
this.openEventPublishes.delete(event.id)
427-
}
428-
}, this.publishTimeout)
429-
this.openEventPublishes.set(event.id, { resolve, reject, timeout })
430-
})
431-
this.send('["EVENT",' + JSON.stringify(event) + ']')
432-
return ret
433-
}
434-
435-
public async count(filters: Filter[], params: { id?: string | null }): Promise<number> {
436-
this.serial++
437-
const id = params?.id || 'count:' + this.serial
438-
const ret = new Promise<number>((resolve, reject) => {
439-
this.openCountRequests.set(id, { resolve, reject })
440-
})
441-
this.send('["COUNT","' + id + '",' + JSON.stringify(filters).substring(1))
442-
return ret
443-
}
444-
445-
public subscribe(
446-
filters: Filter[],
447-
params: Partial<SubscriptionParams> & { label?: string; id?: string },
448-
): Subscription {
449-
const sub = this.prepareSubscription(filters, params)
450-
sub.fire()
451-
452-
if (params.abort) {
453-
params.abort.onabort = () => sub.close(String(params.abort!.reason || '<aborted>'))
454-
}
455-
456-
return sub
457-
}
458-
459-
public prepareSubscription(
460-
filters: Filter[],
461-
params: Partial<SubscriptionParams> & { label?: string; id?: string },
462-
): Subscription {
463-
this.serial++
464-
const id = params.id || (params.label ? params.label + ':' : 'sub:') + this.serial
465-
const subscription = new Subscription(this, id, filters, params)
466-
this.openSubs.set(id, subscription)
467-
return subscription
468-
}
469-
470-
public close() {
471-
this.skipReconnection = true
472-
if (this.reconnectTimeoutHandle) {
473-
clearTimeout(this.reconnectTimeoutHandle)
474-
this.reconnectTimeoutHandle = undefined
475-
}
476-
if (this.pingIntervalHandle) {
477-
clearInterval(this.pingIntervalHandle)
478-
this.pingIntervalHandle = undefined
479-
}
480-
this.closeAllSubscriptions('relay connection closed by us')
481-
this._connected = false
482-
this.onclose?.()
483-
if (this.ws?.readyState === this._WebSocket.OPEN) {
484-
this.ws?.close()
485-
}
486-
}
487-
488-
// this is the function assigned to this.ws.onmessage
489-
// it's exposed for testing and debugging purposes
490-
public _onmessage(ev: MessageEvent<any>) {
491-
this.incomingMessageQueue.enqueue(ev.data as string)
492-
if (!this.queueRunning) {
493-
this.runQueue()
494-
}
495-
}
496477
}
497478

498479
export class Subscription {

helpers.ts

Lines changed: 0 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,5 @@
11
import { verifiedSymbol, type Event, type Nostr, VerifiedEvent } from './core.ts'
22

3-
export async function yieldThread() {
4-
return new Promise<void>((resolve, reject) => {
5-
try {
6-
// Check if MessageChannel is available
7-
if (typeof MessageChannel !== 'undefined') {
8-
const ch = new MessageChannel()
9-
const handler = () => {
10-
// @ts-ignore (typescript thinks this property should be called `removeListener`, but in fact it's `removeEventListener`)
11-
ch.port1.removeEventListener('message', handler)
12-
resolve()
13-
}
14-
// @ts-ignore (typescript thinks this property should be called `addListener`, but in fact it's `addEventListener`)
15-
ch.port1.addEventListener('message', handler)
16-
ch.port2.postMessage(0)
17-
ch.port1.start()
18-
} else {
19-
if (typeof setImmediate !== 'undefined') {
20-
setImmediate(resolve)
21-
} else if (typeof setTimeout !== 'undefined') {
22-
setTimeout(resolve, 0)
23-
} else {
24-
// Last resort - resolve immediately
25-
resolve()
26-
}
27-
}
28-
} catch (e) {
29-
console.error('during yield: ', e)
30-
reject(e)
31-
}
32-
})
33-
}
34-
353
export const alwaysTrue: Nostr['verifyEvent'] = (t: Event): t is VerifiedEvent => {
364
t[verifiedSymbol] = true
375
return true

utils.ts

Lines changed: 0 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -123,62 +123,3 @@ export function mergeReverseSortedLists(list1: NostrEvent[], list2: NostrEvent[]
123123

124124
return result
125125
}
126-
127-
export class QueueNode<V> {
128-
public value: V
129-
public next: QueueNode<V> | null = null
130-
public prev: QueueNode<V> | null = null
131-
132-
constructor(message: V) {
133-
this.value = message
134-
}
135-
}
136-
137-
export class Queue<V> {
138-
public first: QueueNode<V> | null
139-
public last: QueueNode<V> | null
140-
141-
constructor() {
142-
this.first = null
143-
this.last = null
144-
}
145-
146-
enqueue(value: V): boolean {
147-
const newNode = new QueueNode(value)
148-
if (!this.last) {
149-
// list is empty
150-
this.first = newNode
151-
this.last = newNode
152-
} else if (this.last === this.first) {
153-
// list has a single element
154-
this.last = newNode
155-
this.last.prev = this.first
156-
this.first.next = newNode
157-
} else {
158-
// list has elements, add as last
159-
newNode.prev = this.last
160-
this.last.next = newNode
161-
this.last = newNode
162-
}
163-
return true
164-
}
165-
166-
dequeue(): V | null {
167-
if (!this.first) return null
168-
169-
if (this.first === this.last) {
170-
const target = this.first
171-
this.first = null
172-
this.last = null
173-
return target.value
174-
}
175-
176-
const target = this.first
177-
this.first = target.next
178-
if (this.first) {
179-
this.first.prev = null // fix: clean up prev pointer
180-
}
181-
182-
return target.value
183-
}
184-
}

0 commit comments

Comments
 (0)