diff --git a/application/static/application.js b/application/static/application.js index 50b0ed8..2664626 100644 --- a/application/static/application.js +++ b/application/static/application.js @@ -15,7 +15,6 @@ const getClientId = () => { const CONFIG_DEFAULTS = { serviceWorker: './worker.js', - pingInterval: 25000, notificationTimeout: 3000, syncTimeout: 2000, }; @@ -26,11 +25,8 @@ class Application extends Emitter { this.config = { ...CONFIG_DEFAULTS, ...config }; this.state = new Map(); this.worker = null; - const clientId = getClientId(); - this.clientId = clientId; - const protocol = location.protocol === 'https:' ? 'wss:' : 'ws:'; - const url = `${protocol}//${location.host}`; - this.metacom = Metacom.create(url, { clientId }); + this.clientId = getClientId(); + this.metacom = null; this.serviceWorker = this.config.serviceWorker; this.online = navigator.onLine; this.connected = false; @@ -48,56 +44,40 @@ class Application extends Emitter { } #setupServiceWorker() { - const worker = navigator.serviceWorker; - worker.register(this.serviceWorker, { type: 'module' }); - const ping = () => this.post({ type: 'ping' }); - worker.ready.then((registration) => { - setInterval(ping, this.config.pingInterval); + const { serviceWorker } = navigator; + serviceWorker.register(this.serviceWorker, { type: 'module' }); + serviceWorker.ready.then((registration) => { this.worker = registration.active; - const data = { clientId: this.clientId }; - this.post({ type: 'connect', data }); this.#setupMetacom(); }); - worker.addEventListener('message', (event) => { + serviceWorker.addEventListener('message', (event) => { const { type, data } = event.data; this.emit(type, data); }); - this.on('status', (data) => { - this.connected = data.connected; - }); - document.addEventListener('visibilitychange', () => { - this.post({ type: 'ping' }); - }); } async #setupMetacom() { - try { - await this.metacom.load('system'); - const units = await this.metacom.api.system.introspect(['chat']); - this.emit('metacom-ready', { units }); - } catch (err) { - this.emit('metacom-error', { error: err }); - return; - } - try { - await this.metacom.load('chat'); - await this.metacom.api.chat.subscribe({ room: 'sync' }); - this.emit('metacom-ready'); - } catch (err) { - this.emit('metacom-error', { error: err }); - } + const protocol = location.protocol === 'https:' ? 'wss:' : 'ws:'; + const url = `${protocol}//${location.host}`; + const { clientId, worker } = this; + console.log({ clientId, worker }); + this.metacom = Metacom.create(url, { clientId, worker }); + + await this.metacom.load(['system', 'chat']); + await this.metacom.api.chat.subscribe({ room: 'sync' }); + this.connected = true; + this.emit('status', { connected: true }); + this.emit('metacom-ready'); } #setupNetworkStatus() { window.addEventListener('online', () => { this.online = true; - this.post({ type: 'online' }); this.emit('network', { online: true }); }); window.addEventListener('offline', () => { this.online = false; - this.post({ type: 'offline' }); this.emit('network', { online: false }); }); } diff --git a/application/static/domain.js b/application/static/domain.js index 0224028..f444513 100644 --- a/application/static/domain.js +++ b/application/static/domain.js @@ -73,12 +73,9 @@ class ChatApplication extends Application { this.on('install', () => this.showInstallButton(true)); this.on('installed', () => this.showInstallButton(false)); this.on('status', (data) => this.onStatus(data)); - this.on('state', (data) => this.onState(data)); - this.on('username', (data) => this.onUsername(data)); this.on('cacheUpdated', () => this.onCacheUpdated()); this.on('cacheUpdateFailed', (data) => this.onCacheUpdateFailed(data)); this.on('databaseCleared', () => this.onDatabaseCleared()); - this.on('delta', (data) => this.onDelta(data)); this.on('metacom-ready', () => this.setupMetacomEvents()); document.addEventListener('visibilitychange', () => { @@ -99,11 +96,6 @@ class ChatApplication extends Application { const username = this.usernameInput?.value?.trim(); if (!username || username === this.username) return; this.username = username; - if (this.syncTimeout) clearTimeout(this.syncTimeout); - this.syncTimeout = setTimeout(() => { - this.post({ type: 'username', data: this.username }); - this.logger.log('Username auto-synced:', this.username); - }, this.config.syncTimeout); } onStatus(data) { @@ -117,24 +109,6 @@ class ChatApplication extends Application { } } - onState(data) { - this.state.clear(); - if (data && typeof data === 'object') { - for (const [key, value] of Object.entries(data)) { - this.state.set(key, value); - } - } - this.renderChatMessages(); - this.logger.log('State updated from worker'); - } - - onUsername(data) { - this.username = data ?? ''; - if (this.usernameInput) this.usernameInput.value = this.username; - this.logger.log('Username updated from other tab:', data); - this.showNotification('Username updated from other tab: ' + data); - } - onCacheUpdated() { this.logger.log('Cache updated successfully'); this.showNotification('Cache updated successfully!', 'success'); @@ -154,8 +128,6 @@ class ChatApplication extends Application { } onDatabaseCleared() { - this.state.clear(); - this.renderChatMessages(); this.logger.log('Database cleared successfully'); this.showNotification('Database cleared successfully!', 'success'); if (this.clearMessagesBtn) { @@ -208,20 +180,15 @@ class ChatApplication extends Application { } const delta = this.addMessage(content); const deltas = [delta]; - this.post({ type: 'delta', data: deltas }); - if (this.connected) { - try { - await this.metacom.api.chat.applyDelta({ deltas, room: 'sync' }); - } catch (err) { - this.logger.log('Server sync failed:', err); - } + this.renderChatMessages(); + try { + await this.metacom.api.chat.applyDelta({ deltas, room: 'sync' }); this.logger.log('Sent message:', content); this.showNotification('Message sent!', 'success'); - } else { - this.logger.log('Message queued (offline):', content); - this.showNotification('Message queued - will send when online', 'info'); + } catch (err) { + this.logger.log('Failed to send message:', err.message); + this.showNotification('Failed to send message', 'error'); } - this.renderChatMessages(); } renderChatMessages() { @@ -277,7 +244,7 @@ class ChatApplication extends Application { if (!reactionBtns) return; for (const btn of reactionBtns) { const { messageId, reaction } = btn.dataset; - btn.addEventListener('click', () => { + btn.addEventListener('click', async () => { const record = { messageId, reaction }; const delta = { strategy: 'counter', entity: 'reaction', record }; const message = this.state.get(messageId); @@ -287,27 +254,20 @@ class ChatApplication extends Application { message.reactions[reaction] = count + 1; this.renderChatMessages(); } - this.post({ type: 'delta', data: [delta] }); - if (this.metacom?.api?.chat && this.connected) { - this.metacom.api.chat - .applyDelta({ deltas: [delta], room: 'sync' }) - .catch(() => {}); + try { + await this.metacom.api.chat.applyDelta({ + deltas: [delta], + room: 'sync', + }); + const msg = `${reaction} to message:${messageId}`; + this.logger.log('Added reaction:', msg); + } catch (err) { + this.logger.log('Failed to add reaction:', err.message); } - this.logger.log('Added reaction:', reaction, 'to message:', messageId); }); } } - updateCache() { - this.logger.log('Requesting cache update...'); - if (this.updateCacheBtn) { - this.updateCacheBtn.disabled = true; - this.updateCacheBtn.textContent = 'Updating...'; - } - this.showNotification('Cache update requested', 'info'); - this.post({ type: 'updateCache' }); - } - showInstallButton(visible = true) { if (visible) { if (this.installBtn) this.installBtn.classList.remove('hidden'); diff --git a/application/static/metacom.js b/application/static/metacom.js index fddc4b8..a2c1050 100644 --- a/application/static/metacom.js +++ b/application/static/metacom.js @@ -2,6 +2,8 @@ import * as metautil from './metautil.js'; const { Emitter } = metautil; +// chunks-browser.js + const ID_LENGTH_BYTES = 1; const chunkEncode = (id, payload) => { @@ -29,6 +31,8 @@ const chunkDecode = (chunk) => { export { chunkEncode, chunkDecode }; +// streams.js + const PUSH_EVENT = Symbol(); const PULL_EVENT = Symbol(); const DEFAULT_HIGH_WATER_MARK = 32; @@ -176,8 +180,9 @@ class MetaWritable extends Emitter { export { MetaReadable, MetaWritable }; +// metacom.js + const CALL_TIMEOUT = 7 * 1000; -const PING_INTERVAL = 60 * 1000; const RECONNECT_TIMEOUT = 2 * 1000; const toByteView = async (input) => { @@ -208,17 +213,31 @@ class MetacomUnit extends Emitter { class Metacom extends Emitter { static connections = new Set(); + static isOnline = true; static online() { + Metacom.isOnline = true; for (const connection of Metacom.connections) { if (!connection.connected) connection.open(); } } - static offline() {} + static offline() { + Metacom.isOnline = false; + } + + static initialize() { + if (typeof window !== 'undefined') { + window.addEventListener('online', Metacom.online); + window.addEventListener('offline', Metacom.offline); + } else if (typeof self !== 'undefined') { + self.addEventListener('online', Metacom.online); + self.addEventListener('offline', Metacom.offline); + } + } constructor(url, options = {}) { - super(options); + super(); this.url = url; this.socket = null; this.api = {}; @@ -229,19 +248,15 @@ class Metacom extends Emitter { this.opening = null; this.lastActivity = Date.now(); this.callTimeout = options.callTimeout || CALL_TIMEOUT; - this.pingInterval = options.pingInterval || PING_INTERVAL; this.reconnectTimeout = options.reconnectTimeout || RECONNECT_TIMEOUT; this.generateId = options.generateId || metautil.generateId; - this.ping = null; - this.open(); + this.open(options); } static create(url, options) { const { transport } = Metacom; - let Transport = url.startsWith('ws') ? transport.ws : transport.http; - if (typeof navigator !== 'undefined' && navigator.serviceWorker) { - Transport = transport.event; - } + if (options.worker) return transport.event.getInstance(url, options); + const Transport = url.startsWith('ws') ? transport.ws : transport.http; return new Transport(url, options); } @@ -275,14 +290,8 @@ class Metacom extends Emitter { } async message(data) { - if (data === '{}') return; this.lastActivity = Date.now(); - let packet; - try { - packet = JSON.parse(data); - } catch { - return; - } + const packet = JSON.parse(data); const { type, id, name } = packet; if (type === 'event') { const [unit, eventName] = name.split('/'); @@ -290,13 +299,10 @@ class Metacom extends Emitter { if (metacomUnit) metacomUnit.emit(eventName, packet.data); return; } - if (!id) { - console.error(new Error('Packet structure error')); - return; - } + if (!id) throw new Error('Packet structure error'); if (type === 'callback') { const promised = this.calls.get(id); - if (!promised) return; + if (!promised) throw new Error(`Callback ${id} not found`); const [resolve, reject, timeout] = promised; this.calls.delete(id); clearTimeout(timeout); @@ -309,13 +315,13 @@ class Metacom extends Emitter { const stream = this.streams.get(id); if (name && typeof name === 'string' && Number.isSafeInteger(size)) { if (stream) { - console.error(new Error(`Stream ${name} is already initialized`)); + throw new Error(`Stream ${name} is already initialized`); } else { const stream = new MetaReadable(id, name, size); this.streams.set(id, stream); } } else if (!stream) { - console.error(new Error(`Stream ${id} is not initialized`)); + throw new Error(`Stream ${id} is not initialized`); } else if (status === 'end') { await stream.close(); this.streams.delete(id); @@ -323,7 +329,7 @@ class Metacom extends Emitter { await stream.terminate(); this.streams.delete(id); } else { - console.error(new Error('Stream packet structure error')); + throw new Error('Stream packet structure error'); } } } @@ -333,7 +339,7 @@ class Metacom extends Emitter { const { id, payload } = chunkDecode(byteView); const stream = this.streams.get(id); if (stream) await stream.push(payload); - else console.warn(`Stream ${id} is not initialized`); + else throw new Error(`Stream ${id} is not initialized`); } async load(...units) { @@ -354,13 +360,13 @@ class Metacom extends Emitter { } scaffold(unit, ver) { - return (method) => - async (args = {}) => { - const id = this.generateId(); + const id = this.generateId(); + const createMethod = (methodName) => { + const method = async (args = {}) => { const unitName = unit + (ver ? '.' + ver : ''); - const target = unitName + '/' + method; + const target = unitName + '/' + methodName; if (this.opening) await this.opening; - if (!this.connected) await this.open(); + const packet = { type: 'call', id, method: target, args }; return new Promise((resolve, reject) => { const timeout = setTimeout(() => { if (this.calls.has(id)) { @@ -369,10 +375,12 @@ class Metacom extends Emitter { } }, this.callTimeout); this.calls.set(id, [resolve, reject, timeout]); - const packet = { type: 'call', id, method: target, args }; this.send(packet); }); }; + return method; + }; + return createMethod; } } @@ -404,15 +412,6 @@ class WebsocketTransport extends Metacom { socket.close(); }); - if (this.pingInterval) { - this.ping = setInterval(() => { - if (this.active) { - const interval = Date.now() - this.lastActivity; - if (interval > this.pingInterval) this.write('{}'); - } - }, this.pingInterval); - } - this.opening = new Promise((resolve) => { socket.addEventListener('open', () => { this.opening = null; @@ -427,23 +426,20 @@ class WebsocketTransport extends Metacom { close() { this.active = false; Metacom.connections.delete(this); - if (this.ping) clearInterval(this.ping); if (!this.socket) return; this.socket.close(); this.socket = null; } write(data) { - if (!this.connected) return; + if (!this.connected) throw new Error('Not connected'); this.lastActivity = Date.now(); this.socket.send(data); } send(data) { - if (!this.connected) return; - this.lastActivity = Date.now(); const payload = JSON.stringify(data); - this.socket.send(payload); + this.write(payload); } } @@ -472,111 +468,109 @@ class HttpTransport extends Metacom { } class EventTransport extends Metacom { - constructor(url, options = {}) { - super(url, options); - this.worker = null; - this.clientId = options.clientId || metautil.generateId(); - this.messageHandler = null; + static messagePort = null; + static instance = null; + + static getInstance(url, options = {}) { + if (EventTransport.instance) return EventTransport.instance; + EventTransport.instance = new EventTransport(url, options); + return EventTransport.instance; } - async open() { + async open(options = {}) { if (this.opening) return this.opening; if (this.connected) return Promise.resolve(); this.active = true; - Metacom.connections.add(this); - this.opening = new Promise((resolve, reject) => { - if (typeof navigator === 'undefined') { - return void reject(new Error('Navigator not supported')); - } - const { serviceWorker } = navigator; - if (!serviceWorker) { - return void reject(new Error('Service Worker not supported')); - } - serviceWorker.ready.then((registration) => { - this.worker = registration.active; - this.messageHandler = (event) => { - const { type, data } = event.data; - if (type === 'metacom' && data !== undefined) { - if (typeof data === 'string') this.message(data); - else this.binary(data); - } - }; - serviceWorker.addEventListener('message', this.messageHandler); - this.connected = true; - resolve(); + const worker = options.worker || this.worker; + if (!worker) throw new Error('Service Worker not provided'); + this.worker = worker; + this.opening = new Promise((resolve) => { + const { port1, port2 } = new MessageChannel(); + EventTransport.messagePort = port1; + port1.addEventListener('message', (event) => { + const { data } = event; + if (data !== undefined) { + if (typeof data === 'string') this.message(data); + else this.binary(data); + } }); + port1.start(); + this.worker.postMessage({ type: 'metacom:connect' }, [port2]); + this.connected = true; + resolve(); }); return this.opening; } close() { this.active = false; + this.opening = null; Metacom.connections.delete(this); - if (this.ping) clearInterval(this.ping); - if (this.messageHandler) { - const { serviceWorker } = navigator; - serviceWorker.removeEventListener('message', this.messageHandler); - this.messageHandler = null; - } this.connected = false; - this.worker = null; + } + + online() { + if (this.worker) { + this.worker.postMessage({ type: 'metacom:online' }); + } + } + + offline() { + if (this.worker) { + this.worker.postMessage({ type: 'metacom:offline' }); + } } write(data) { - if (!this.worker) return; + if (!EventTransport.messagePort) throw new Error('Not connected'); this.lastActivity = Date.now(); - const { clientId } = this; - this.worker.postMessage({ type: 'metacom', clientId, data }); + EventTransport.messagePort.postMessage(data); } send(data) { - if (!this.worker) return; - this.lastActivity = Date.now(); const payload = JSON.stringify(data); - const { clientId } = this; - this.worker.postMessage({ type: 'metacom', clientId, data: payload }); + this.write(payload); } } class MetacomProxy extends Emitter { constructor(options = {}) { super(options); - this.clients = new Map(); + this.ports = new Map(); this.connection = null; this.url = null; this.callTimeout = options.callTimeout || CALL_TIMEOUT; - this.pingInterval = options.pingInterval || PING_INTERVAL; this.reconnectTimeout = options.reconnectTimeout || RECONNECT_TIMEOUT; this.generateId = options.generateId || metautil.generateId; + if (typeof self !== 'undefined') { + self.addEventListener('message', (event) => { + const { type } = event.data; + if (type?.startsWith('metacom')) this.handleMessage(event); + }); + } } - async ensureConnection() { + async open(options = {}) { if (this.connection) { - return this.connection.connected - ? Promise.resolve() - : this.connection.open(); + if (this.connection.connected) return Promise.resolve(); + return this.connection.open(options); } const protocol = self.location.protocol === 'https:' ? 'wss:' : 'ws:'; this.url = `${protocol}//${self.location.host}`; - const options = { + const opts = { callTimeout: this.callTimeout, - pingInterval: this.pingInterval, reconnectTimeout: this.reconnectTimeout, generateId: this.generateId, }; - this.connection = new WebsocketTransport(this.url, options); + this.connection = new WebsocketTransport(this.url, opts); this.connection.message = async (data) => { - MetacomProxy.broadcast({ type: 'metacom', data }); + this.broadcast(data); }; this.connection.binary = async (input) => { const data = await toByteView(input); - MetacomProxy.broadcast({ type: 'metacom', data }); + this.broadcast(data); }; - return this.connection.open(); - } - - open() { - return this.ensureConnection(); + return this.connection.open(opts); } close() { @@ -587,24 +581,33 @@ class MetacomProxy extends Emitter { } async handleMessage(event) { - const { type, clientId, data } = event.data; - if (type !== 'metacom' || data === undefined) return; - if (!this.clients.has(clientId)) { - this.clients.set(clientId, { clientId }); + const { type } = event.data; + if (type === 'metacom:connect') { + const port = event.ports[0]; + if (!port) throw new Error('MessagePort not provided'); + const portId = this.generateId(); + this.ports.set(portId, port); + port.addEventListener('message', async (messageEvent) => { + const { data } = messageEvent; + if (data === undefined) throw new Error('Message data is undefined'); + await this.open(); + if (!this.connection || !this.connection.connected) { + throw new Error('Not connected to server'); + } + this.connection.write(data); + }); + port.start(); + } else if (type === 'metacom:online') { + Metacom.online(); + } else if (type === 'metacom:offline') { + Metacom.offline(); } - await this.ensureConnection(); - if (!this.connection || !this.connection.connected) return; - this.connection.write(data); } - static async broadcast(message, excludeClient) { - const clients = await self.clients.matchAll({ - includeUncontrolled: true, - type: 'window', - }); - for (const client of clients) { - if (excludeClient && client.id === excludeClient.id) continue; - client.postMessage(message); + broadcast(data, excludePort = null) { + for (const port of this.ports.values()) { + if (port === excludePort) continue; + port.postMessage(data); } } } @@ -615,4 +618,5 @@ Metacom.transport = { event: EventTransport, }; +Metacom.initialize(); export { Metacom, MetacomUnit, MetacomProxy }; diff --git a/application/static/worker.js b/application/static/worker.js index 03c3883..d519c93 100644 --- a/application/static/worker.js +++ b/application/static/worker.js @@ -211,27 +211,6 @@ self.addEventListener('activate', (event) => { }); const events = { - connect: (source, data) => { - syncManager.clientId = data.clientId; - const connected = metacomProxy.connection?.connected || false; - source.postMessage({ type: 'status', data: { connected } }); - const messages = syncManager.getMessages(); - console.log({ messages }); - source.postMessage({ type: 'state', data: messages }); - }, - online: () => metacomProxy.open(), - offline: () => metacomProxy.close(), - delta: (source, data) => { - syncManager.applyDelta(data); - syncManager.lastDeltaId += data.length; - MetacomProxy.broadcast({ type: 'delta', data }, source); - }, - username: (source, data) => { - MetacomProxy.broadcast({ type: 'username', data }); - }, - ping: (source) => { - source.postMessage({ type: 'pong' }); - }, updateCache: async (source) => { try { await cacheManager.update(); @@ -244,8 +223,6 @@ const events = { clearDatabase: async (source) => { try { await syncManager.clearDatabase(); - const messages = syncManager.getMessages(); - MetacomProxy.broadcast({ type: 'state', data: messages }); source.postMessage({ type: 'databaseCleared' }); } catch (error) { const data = { error: error.message }; @@ -255,11 +232,8 @@ const events = { }; self.addEventListener('message', (event) => { + console.log({ serviceWorkerMessage: event }); const { type, data } = event.data; - if (type === 'metacom') { - metacomProxy.handleMessage(event); - return; - } const handler = events[type]; if (handler) handler(event.source, data); });