diff --git a/package.json b/package.json index a854539..d680a20 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "hypermind", - "version": "0.9.0", + "version": "0.9.1", "description": "A decentralized P2P counter of active deployments", "main": "server.js", "scripts": { diff --git a/public/app.js b/public/app.js index cf7bbcf..5397171 100644 --- a/public/app.js +++ b/public/app.js @@ -447,10 +447,23 @@ const getColorFromId = (id) => { return "#" + "00000".substring(0, 6 - c.length) + c; }; +const seenMessageIds = new Set(); +const messageIdHistory = []; + const appendMessage = (msg) => { const div = document.createElement("div"); if (msg.type === "CHAT") { + if (msg.id) { + if (seenMessageIds.has(msg.id)) return; + seenMessageIds.add(msg.id); + messageIdHistory.push(msg.id); + if (messageIdHistory.length > 100) { + const oldest = messageIdHistory.shift(); + seenMessageIds.delete(oldest); + } + } + // Block check if (blockedUsers.has(msg.sender)) return; diff --git a/src/p2p/swarm.js b/src/p2p/swarm.js index e8c1f0a..9fbe592 100644 --- a/src/p2p/swarm.js +++ b/src/p2p/swarm.js @@ -1,172 +1,203 @@ const Hyperswarm = require("hyperswarm"); const { signMessage } = require("../core/security"); -const { TOPIC, TOPIC_NAME, HEARTBEAT_INTERVAL, MAX_CONNECTIONS, CONNECTION_ROTATION_INTERVAL, ENABLE_CHAT } = require("../config/constants"); +const { + TOPIC, + TOPIC_NAME, + HEARTBEAT_INTERVAL, + MAX_CONNECTIONS, + CONNECTION_ROTATION_INTERVAL, + ENABLE_CHAT, +} = require("../config/constants"); const { generateScreenname } = require("../utils/name-generator"); class SwarmManager { - constructor(identity, peerManager, diagnostics, messageHandler, relayFn, broadcastFn, chatSystemFn) { - this.identity = identity; - this.peerManager = peerManager; - this.diagnostics = diagnostics; - this.messageHandler = messageHandler; - this.relayFn = relayFn; - this.broadcastFn = broadcastFn; - this.chatSystemFn = chatSystemFn; - - this.swarm = new Hyperswarm(); - this.heartbeatInterval = null; - this.rotationInterval = null; + constructor( + identity, + peerManager, + diagnostics, + messageHandler, + relayFn, + broadcastFn, + chatSystemFn + ) { + this.identity = identity; + this.peerManager = peerManager; + this.diagnostics = diagnostics; + this.messageHandler = messageHandler; + this.relayFn = relayFn; + this.broadcastFn = broadcastFn; + this.chatSystemFn = chatSystemFn; + + this.swarm = new Hyperswarm(); + this.heartbeatInterval = null; + this.rotationInterval = null; + } + + async start() { + this.swarm.on("connection", (socket) => this.handleConnection(socket)); + + const discovery = this.swarm.join(TOPIC); + await discovery.flushed(); + + this.startHeartbeat(); + this.startRotation(); + } + + handleConnection(socket) { + if (this.swarm.connections.size > MAX_CONNECTIONS) { + socket.destroy(); + return; } - async start() { - this.swarm.on("connection", (socket) => this.handleConnection(socket)); - - const discovery = this.swarm.join(TOPIC); - await discovery.flushed(); - - this.startHeartbeat(); - this.startRotation(); - } + socket.connectedAt = Date.now(); + + const sig = signMessage( + `seq:${this.peerManager.getSeq()}`, + this.identity.privateKey + ); + const hello = JSON.stringify({ + type: "HEARTBEAT", + id: this.identity.id, + seq: this.peerManager.getSeq(), + hops: 0, + nonce: this.identity.nonce, + sig, + }); + socket.write(hello); + this.broadcastFn(); + + socket.buffer = ""; + + socket.on("data", (data) => { + this.diagnostics.increment("bytesReceived", data.length); + socket.buffer += data.toString(); + + const lines = socket.buffer.split("\n"); + + socket.buffer = lines.pop(); + + for (const msgStr of lines) { + if (!msgStr.trim()) continue; + try { + const msg = JSON.parse(msgStr); + this.messageHandler.handleMessage(msg, socket); + } catch (e) {} + } + }); + + socket.on("close", () => { + if (socket.peerId && this.peerManager.hasPeer(socket.peerId)) { + this.peerManager.removePeer(socket.peerId); + } + this.broadcastFn(); + }); + + socket.on("error", () => {}); + } + + startHeartbeat() { + this.heartbeatInterval = setInterval(() => { + const seq = this.peerManager.incrementSeq(); + this.peerManager.addOrUpdatePeer(this.identity.id, seq, null); + + this.messageHandler.bloomFilter.markRelayed(this.identity.id, seq); + + const sig = signMessage(`seq:${seq}`, this.identity.privateKey); + const heartbeat = + JSON.stringify({ + type: "HEARTBEAT", + id: this.identity.id, + seq, + hops: 0, + nonce: this.identity.nonce, + sig, + }) + "\n"; - handleConnection(socket) { - if (this.swarm.connections.size > MAX_CONNECTIONS) { - socket.destroy(); - return; - } + for (const socket of this.swarm.connections) { + socket.write(heartbeat); + } - socket.connectedAt = Date.now(); - - const sig = signMessage(`seq:${this.peerManager.getSeq()}`, this.identity.privateKey); - const hello = JSON.stringify({ - type: "HEARTBEAT", - id: this.identity.id, - seq: this.peerManager.getSeq(), - hops: 0, - nonce: this.identity.nonce, - sig, - }); - socket.write(hello); + const removed = this.peerManager.cleanupStalePeers(); + if (removed > 0) { this.broadcastFn(); - - socket.buffer = ""; - - socket.on("data", (data) => { - this.diagnostics.increment("bytesReceived", data.length); - socket.buffer += data.toString(); - - const lines = socket.buffer.split("\n"); - // The last element is either an empty string (if data ended with \n) - // or the incomplete part of the next message. - socket.buffer = lines.pop(); - - for (const msgStr of lines) { - if (!msgStr.trim()) continue; - try { - const msg = JSON.parse(msgStr); - this.messageHandler.handleMessage(msg, socket); - } catch (e) { - // Invalid JSON or partial message (shouldn't happen with buffering logic unless data is corrupted) - } - } - }); - - socket.on("close", () => { - if (socket.peerId && this.peerManager.hasPeer(socket.peerId)) { - this.peerManager.removePeer(socket.peerId); - } - this.broadcastFn(); - }); - - socket.on("error", () => { }); + } + }, HEARTBEAT_INTERVAL); + } + + startRotation() { + this.rotationInterval = setInterval(() => { + if (this.swarm.connections.size < MAX_CONNECTIONS / 2) return; + + let oldest = null; + for (const socket of this.swarm.connections) { + if (!oldest || socket.connectedAt < oldest.connectedAt) { + oldest = socket; + } + } + + if (oldest) { + if (ENABLE_CHAT && this.chatSystemFn && oldest.peerId) { + this.chatSystemFn({ + type: "SYSTEM", + content: `Connection with Node ...${oldest.peerId.slice( + -8 + )} severed (Rotation).`, + timestamp: Date.now(), + }); + } + oldest.destroy(); + } + }, CONNECTION_ROTATION_INTERVAL); + } + + shutdown() { + const sig = signMessage( + `type:LEAVE:${this.identity.id}`, + this.identity.privateKey + ); + const goodbye = + JSON.stringify({ + type: "LEAVE", + id: this.identity.id, + hops: 0, + sig, + }) + "\n"; + + this.messageHandler.bloomFilter.markRelayed(this.identity.id, "leave"); + + for (const socket of this.swarm.connections) { + socket.write(goodbye); } - startHeartbeat() { - this.heartbeatInterval = setInterval(() => { - const seq = this.peerManager.incrementSeq(); - this.peerManager.addOrUpdatePeer(this.identity.id, seq, null); - - const sig = signMessage(`seq:${seq}`, this.identity.privateKey); - const heartbeat = JSON.stringify({ - type: "HEARTBEAT", - id: this.identity.id, - seq, - hops: 0, - nonce: this.identity.nonce, - sig, - }) + "\n"; - - for (const socket of this.swarm.connections) { - socket.write(heartbeat); - } - - const removed = this.peerManager.cleanupStalePeers(); - if (removed > 0) { - this.broadcastFn(); - } - }, HEARTBEAT_INTERVAL); + if (this.heartbeatInterval) { + clearInterval(this.heartbeatInterval); } - startRotation() { - this.rotationInterval = setInterval(() => { - if (this.swarm.connections.size < MAX_CONNECTIONS / 2) return; - - let oldest = null; - for (const socket of this.swarm.connections) { - if (!oldest || socket.connectedAt < oldest.connectedAt) { - oldest = socket; - } - } - - if (oldest) { - if (ENABLE_CHAT && this.chatSystemFn && oldest.peerId) { - this.chatSystemFn({ - type: "SYSTEM", - content: `Connection with Node ...${oldest.peerId.slice(-8)} severed (Rotation).`, - timestamp: Date.now() - }); - } - oldest.destroy(); - } - }, CONNECTION_ROTATION_INTERVAL); + if (this.rotationInterval) { + clearInterval(this.rotationInterval); } - shutdown() { - const sig = signMessage(`type:LEAVE:${this.identity.id}`, this.identity.privateKey); - const goodbye = JSON.stringify({ - type: "LEAVE", - id: this.identity.id, - hops: 0, - sig, - }) + "\n"; - - for (const socket of this.swarm.connections) { - socket.write(goodbye); - } - - if (this.heartbeatInterval) { - clearInterval(this.heartbeatInterval); - } + setTimeout(() => { + process.exit(0); + }, 500); + } - if (this.rotationInterval) { - clearInterval(this.rotationInterval); - } + getSwarm() { + return this.swarm; + } - setTimeout(() => { - process.exit(0); - }, 500); - } + broadcastChat(msg) { + if (!ENABLE_CHAT) return; - getSwarm() { - return this.swarm; + if (msg.id) { + this.messageHandler.bloomFilter.markRelayed(msg.id, "chat"); } - broadcastChat(msg) { - if (!ENABLE_CHAT) return; - const msgStr = JSON.stringify(msg) + "\n"; - for (const socket of this.swarm.connections) { - socket.write(msgStr); - } + const msgStr = JSON.stringify(msg) + "\n"; + for (const socket of this.swarm.connections) { + socket.write(msgStr); } + } } module.exports = { SwarmManager }; diff --git a/src/state/peers.js b/src/state/peers.js index 7b7882d..155de15 100644 --- a/src/state/peers.js +++ b/src/state/peers.js @@ -11,6 +11,12 @@ class PeerManager { addOrUpdatePeer(id, seq, ip = null) { const stored = this.seenPeers.get(id); + + // If we have a stored peer, only update if the new sequence is higher + if (stored && seq <= stored.seq) { + return false; + } + const wasNew = !stored; // Track in HyperLogLog for total unique estimation