Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 0 additions & 13 deletions .dockerignore

This file was deleted.

2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
node_modules
/log/
/application/tasks
/application/data
*.log
*.done
*.pem
.DS_Store
data/
7 changes: 6 additions & 1 deletion .prettierignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
application/tasks/
/application/tasks
/application/data
*.log
*.done
*.pem
.DS_Store
package-lock.json
package.json
5 changes: 3 additions & 2 deletions application/api/chat/applyDelta.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
({
access: 'public',

method: async ({ deltas }) => {
await domain.sync.applyDelta(deltas);
method: async ({ deltas, room }) => {
const excludeClient = context?.client;
await domain.sync.applyDelta(deltas, room, excludeClient);
return true;
},
});
2 changes: 1 addition & 1 deletion application/api/chat/getRoom.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
access: 'public',

method: async ({ name }) => {
domain.chat.getRoom(name);
await domain.chat.getRoom(name);
return { name };
},
});
4 changes: 2 additions & 2 deletions application/api/chat/getState.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
({
access: 'public',

method: async () => {
const state = await domain.sync.getState();
method: async ({ room }) => {
const state = await domain.sync.getState(room);
return { state };
},
});
3 changes: 1 addition & 2 deletions application/api/chat/subscribe.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@
access: 'public',

method: async ({ room }) => {
const clients = domain.chat.getRoom(room);
const clients = await domain.chat.getRoom(room);
clients.add(context.client);
context.client.on('close', () => {
clients.delete(context.client);
if (clients.size === 0) domain.chat.dropRoom(room);
});
return true;
},
Expand Down
1 change: 0 additions & 1 deletion application/api/chat/unsubscribe.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
const clients = domain.chat.rooms.get(room);
if (!clients) throw new Error(`Room ${room} is not found`);
clients.delete(context.client);
if (clients.size === 0) domain.chat.dropRoom(room);
return true;
},
});
Empty file removed application/db/.gitkeep
Empty file.
7 changes: 7 additions & 0 deletions application/db/globalstorage/start.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
async () => {
if (application.worker.id === 'W1') {
console.debug('Open globalstorage');
}
const dataPath = node.path.join(process.cwd(), 'application/data');
db.globalstorage = await npm.globalstorage.open({ path: dataPath });
};
55 changes: 39 additions & 16 deletions application/domain/chat.js
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
({
rooms: new Map(),
SYNC_ID: 'xxii-chat',

getRoom(name) {
async getRoom(name) {
let room = domain.chat.rooms.get(name);
if (room) return room;
const id = 'room:' + name;
const exists = await db.globalstorage.has(id);
if (!exists) {
await db.globalstorage.set(id, { name, createdAt: Date.now() });
}
room = new Set();
domain.chat.rooms.set(name, room);
return room;
},

dropRoom(name) {
domain.chat.rooms.delete(name);
},

send(name, message) {
const room = domain.chat.rooms.get(name);
if (!room) throw new Error(`Room ${name} is not found`);
Expand All @@ -22,19 +22,42 @@
}
},

async getStorage() {
const dataPath = node.path.join(process.cwd(), 'data', 'chat');
return await npm.globalstorage.open({ path: dataPath });
async getMessage(id) {
return await db.globalstorage.get(id);
},

async setMessage(data) {
const exists = await db.globalstorage.has(data.id);
if (exists) return;
await db.globalstorage.set(data.id, data);
},

async updateMessage(id, delta) {
await db.globalstorage.update(id, delta);
},

async getMessageIds(room) {
const key = 'room:' + room + ':messages';
const record = await db.globalstorage.get(key);
return record?.ids ?? [];
},

async addMessageId(room, id) {
const key = 'room:' + room + ':messages';
const ids = await domain.chat.getMessageIds(room);
if (ids.includes(id)) return;
ids.push(id);
await db.globalstorage.set(key, { ids });
},

async load() {
const storage = await domain.chat.getStorage();
const data = await storage.get(domain.chat.SYNC_ID);
return data || { messages: {}, deltas: [] };
async getDeltas() {
const record = await db.globalstorage.get('sync:deltas');
return record?.deltas ?? [];
},

async save(data) {
const storage = await domain.chat.getStorage();
await storage.set(domain.chat.SYNC_ID, data);
async appendDeltas(deltas) {
const existing = await domain.chat.getDeltas();
const deltasRecord = { deltas: existing.concat(deltas) };
await db.globalstorage.set('sync:deltas', deltasRecord);
},
});
47 changes: 32 additions & 15 deletions application/domain/sync.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,32 +13,49 @@
}
},

async applyDelta(deltasToApply) {
const data = await domain.chat.load();
const messages = data.messages || {};
const deltas = data.deltas || [];
async applyDelta(deltasToApply, room, excludeClient) {
for (const delta of deltasToApply) {
this.applyCRDT(messages, delta);
deltas.push(delta);
const { strategy, entity, record } = delta;
if (entity === 'message' && strategy === 'lww') {
const existing = await domain.chat.getMessage(record.id);
const base = existing ?? {};
const merged = { ...base, ...record };
await domain.chat.setMessage(merged);
await domain.chat.addMessageId(room, record.id);
} else if (entity === 'reaction' && strategy === 'counter') {
const messageId = record.messageId;
const message = await domain.chat.getMessage(messageId);
if (!message) return;
const reactions = { ...message.reactions };
const count = reactions[record.reaction] || 0;
reactions[record.reaction] = count + 1;
await domain.chat.updateMessage(messageId, { reactions });
}
}
await domain.chat.save({ messages, deltas });
await domain.chat.appendDeltas(deltasToApply);

const packet = { type: 'delta', data: deltasToApply };
const room = domain.chat.getRoom('sync');
if (room && room.size > 0) {
for (const client of room) {
const clients = await domain.chat.getRoom(room);
if (clients && clients.size > 0) {
for (const client of clients) {
if (client === excludeClient) continue;
client.emit('chat/delta', packet);
}
}
},

async getDeltasSince(lastDeltaId) {
const data = await domain.chat.load();
const deltas = data.deltas || [];
const deltas = await domain.chat.getDeltas();
return deltas.slice(lastDeltaId);
},

async getState() {
const data = await domain.chat.load();
return data.messages || {};
async getState(room) {
const ids = await domain.chat.getMessageIds(room);
const messages = {};
for (const id of ids) {
const message = await domain.chat.getMessage(id);
if (message) messages[id] = message;
}
return messages;
},
});
6 changes: 4 additions & 2 deletions application/static/domain.js
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ class ChatApplication extends Application {
this.post({ type: 'delta', data: deltas });
if (this.connected) {
try {
await this.metacom.api.chat.applyDelta({ deltas });
await this.metacom.api.chat.applyDelta({ deltas, room: 'sync' });
} catch (err) {
this.logger.log('Server sync failed:', err);
}
Expand Down Expand Up @@ -289,7 +289,9 @@ class ChatApplication extends Application {
}
this.post({ type: 'delta', data: [delta] });
if (this.metacom?.api?.chat && this.connected) {
this.metacom.api.chat.applyDelta({ deltas: [delta] }).catch(() => {});
this.metacom.api.chat
.applyDelta({ deltas: [delta], room: 'sync' })
.catch(() => {});
}
this.logger.log('Added reaction:', reaction, 'to message:', messageId);
});
Expand Down
8 changes: 5 additions & 3 deletions application/static/metacom.js
Original file line number Diff line number Diff line change
Expand Up @@ -501,9 +501,10 @@ class EventTransport extends Metacom {
else this.binary(data);
}
};
serviceWorker.addEventListener('message', this.messageHandler);
this.connected = true;
resolve();
});
serviceWorker.addEventListener('message', this.messageHandler);
resolve();
});
return this.opening;
}
Expand Down Expand Up @@ -596,12 +597,13 @@ class MetacomProxy extends Emitter {
this.connection.write(data);
}

static async broadcast(message) {
static async broadcast(message, excludeClient) {
const clients = await self.clients.matchAll({
includeUncontrolled: true,
type: 'window',
});
for (const client of clients) {
if (excludeClient && client.id === excludeClient.id) continue;
client.postMessage(message);
}
}
Expand Down
2 changes: 1 addition & 1 deletion application/static/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ const events = {
delta: (source, data) => {
syncManager.applyDelta(data);
syncManager.lastDeltaId += data.length;
MetacomProxy.broadcast({ type: 'delta', data });
MetacomProxy.broadcast({ type: 'delta', data }, source);
},
username: (source, data) => {
MetacomProxy.broadcast({ type: 'username', data });
Expand Down