Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "hypermind",
"version": "0.9.0",
"version": "0.9.1",
"description": "A decentralized P2P counter of active deployments",
"main": "server.js",
"scripts": {
Expand Down
13 changes: 13 additions & 0 deletions public/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -447,10 +447,23 @@ const getColorFromId = (id) => {
return "#" + "00000".substring(0, 6 - c.length) + c;
};

const seenMessageIds = new Set();
const messageIdHistory = [];

const appendMessage = (msg) => {
const div = document.createElement("div");

if (msg.type === "CHAT") {
if (msg.id) {
if (seenMessageIds.has(msg.id)) return;
seenMessageIds.add(msg.id);
messageIdHistory.push(msg.id);
if (messageIdHistory.length > 100) {
const oldest = messageIdHistory.shift();
seenMessageIds.delete(oldest);
}
}

// Block check
if (blockedUsers.has(msg.sender)) return;

Expand Down
325 changes: 178 additions & 147 deletions src/p2p/swarm.js
Original file line number Diff line number Diff line change
@@ -1,172 +1,203 @@
const Hyperswarm = require("hyperswarm");
const { signMessage } = require("../core/security");
const { TOPIC, TOPIC_NAME, HEARTBEAT_INTERVAL, MAX_CONNECTIONS, CONNECTION_ROTATION_INTERVAL, ENABLE_CHAT } = require("../config/constants");
const {
TOPIC,
TOPIC_NAME,
HEARTBEAT_INTERVAL,
MAX_CONNECTIONS,
CONNECTION_ROTATION_INTERVAL,
ENABLE_CHAT,
} = require("../config/constants");
const { generateScreenname } = require("../utils/name-generator");

class SwarmManager {
constructor(identity, peerManager, diagnostics, messageHandler, relayFn, broadcastFn, chatSystemFn) {
this.identity = identity;
this.peerManager = peerManager;
this.diagnostics = diagnostics;
this.messageHandler = messageHandler;
this.relayFn = relayFn;
this.broadcastFn = broadcastFn;
this.chatSystemFn = chatSystemFn;

this.swarm = new Hyperswarm();
this.heartbeatInterval = null;
this.rotationInterval = null;
constructor(
identity,
peerManager,
diagnostics,
messageHandler,
relayFn,
broadcastFn,
chatSystemFn
) {
this.identity = identity;
this.peerManager = peerManager;
this.diagnostics = diagnostics;
this.messageHandler = messageHandler;
this.relayFn = relayFn;
this.broadcastFn = broadcastFn;
this.chatSystemFn = chatSystemFn;

this.swarm = new Hyperswarm();
this.heartbeatInterval = null;
this.rotationInterval = null;
}

async start() {
this.swarm.on("connection", (socket) => this.handleConnection(socket));

const discovery = this.swarm.join(TOPIC);
await discovery.flushed();

this.startHeartbeat();
this.startRotation();
}

handleConnection(socket) {
if (this.swarm.connections.size > MAX_CONNECTIONS) {
socket.destroy();
return;
}

async start() {
this.swarm.on("connection", (socket) => this.handleConnection(socket));

const discovery = this.swarm.join(TOPIC);
await discovery.flushed();

this.startHeartbeat();
this.startRotation();
}
socket.connectedAt = Date.now();

const sig = signMessage(
`seq:${this.peerManager.getSeq()}`,
this.identity.privateKey
);
const hello = JSON.stringify({
type: "HEARTBEAT",
id: this.identity.id,
seq: this.peerManager.getSeq(),
hops: 0,
nonce: this.identity.nonce,
sig,
});
socket.write(hello);
this.broadcastFn();

socket.buffer = "";

socket.on("data", (data) => {
this.diagnostics.increment("bytesReceived", data.length);
socket.buffer += data.toString();

const lines = socket.buffer.split("\n");

socket.buffer = lines.pop();

for (const msgStr of lines) {
if (!msgStr.trim()) continue;
try {
const msg = JSON.parse(msgStr);
this.messageHandler.handleMessage(msg, socket);
} catch (e) {}
}
});

socket.on("close", () => {
if (socket.peerId && this.peerManager.hasPeer(socket.peerId)) {
this.peerManager.removePeer(socket.peerId);
}
this.broadcastFn();
});

socket.on("error", () => {});
}

startHeartbeat() {
this.heartbeatInterval = setInterval(() => {
const seq = this.peerManager.incrementSeq();
this.peerManager.addOrUpdatePeer(this.identity.id, seq, null);

this.messageHandler.bloomFilter.markRelayed(this.identity.id, seq);

const sig = signMessage(`seq:${seq}`, this.identity.privateKey);
const heartbeat =
JSON.stringify({
type: "HEARTBEAT",
id: this.identity.id,
seq,
hops: 0,
nonce: this.identity.nonce,
sig,
}) + "\n";

handleConnection(socket) {
if (this.swarm.connections.size > MAX_CONNECTIONS) {
socket.destroy();
return;
}
for (const socket of this.swarm.connections) {
socket.write(heartbeat);
}

socket.connectedAt = Date.now();

const sig = signMessage(`seq:${this.peerManager.getSeq()}`, this.identity.privateKey);
const hello = JSON.stringify({
type: "HEARTBEAT",
id: this.identity.id,
seq: this.peerManager.getSeq(),
hops: 0,
nonce: this.identity.nonce,
sig,
});
socket.write(hello);
const removed = this.peerManager.cleanupStalePeers();
if (removed > 0) {
this.broadcastFn();

socket.buffer = "";

socket.on("data", (data) => {
this.diagnostics.increment("bytesReceived", data.length);
socket.buffer += data.toString();

const lines = socket.buffer.split("\n");
// The last element is either an empty string (if data ended with \n)
// or the incomplete part of the next message.
socket.buffer = lines.pop();

for (const msgStr of lines) {
if (!msgStr.trim()) continue;
try {
const msg = JSON.parse(msgStr);
this.messageHandler.handleMessage(msg, socket);
} catch (e) {
// Invalid JSON or partial message (shouldn't happen with buffering logic unless data is corrupted)
}
}
});

socket.on("close", () => {
if (socket.peerId && this.peerManager.hasPeer(socket.peerId)) {
this.peerManager.removePeer(socket.peerId);
}
this.broadcastFn();
});

socket.on("error", () => { });
}
}, HEARTBEAT_INTERVAL);
}

startRotation() {
this.rotationInterval = setInterval(() => {
if (this.swarm.connections.size < MAX_CONNECTIONS / 2) return;

let oldest = null;
for (const socket of this.swarm.connections) {
if (!oldest || socket.connectedAt < oldest.connectedAt) {
oldest = socket;
}
}

if (oldest) {
if (ENABLE_CHAT && this.chatSystemFn && oldest.peerId) {
this.chatSystemFn({
type: "SYSTEM",
content: `Connection with Node ...${oldest.peerId.slice(
-8
)} severed (Rotation).`,
timestamp: Date.now(),
});
}
oldest.destroy();
}
}, CONNECTION_ROTATION_INTERVAL);
}

shutdown() {
const sig = signMessage(
`type:LEAVE:${this.identity.id}`,
this.identity.privateKey
);
const goodbye =
JSON.stringify({
type: "LEAVE",
id: this.identity.id,
hops: 0,
sig,
}) + "\n";

this.messageHandler.bloomFilter.markRelayed(this.identity.id, "leave");

for (const socket of this.swarm.connections) {
socket.write(goodbye);
}

startHeartbeat() {
this.heartbeatInterval = setInterval(() => {
const seq = this.peerManager.incrementSeq();
this.peerManager.addOrUpdatePeer(this.identity.id, seq, null);

const sig = signMessage(`seq:${seq}`, this.identity.privateKey);
const heartbeat = JSON.stringify({
type: "HEARTBEAT",
id: this.identity.id,
seq,
hops: 0,
nonce: this.identity.nonce,
sig,
}) + "\n";

for (const socket of this.swarm.connections) {
socket.write(heartbeat);
}

const removed = this.peerManager.cleanupStalePeers();
if (removed > 0) {
this.broadcastFn();
}
}, HEARTBEAT_INTERVAL);
if (this.heartbeatInterval) {
clearInterval(this.heartbeatInterval);
}

startRotation() {
this.rotationInterval = setInterval(() => {
if (this.swarm.connections.size < MAX_CONNECTIONS / 2) return;

let oldest = null;
for (const socket of this.swarm.connections) {
if (!oldest || socket.connectedAt < oldest.connectedAt) {
oldest = socket;
}
}

if (oldest) {
if (ENABLE_CHAT && this.chatSystemFn && oldest.peerId) {
this.chatSystemFn({
type: "SYSTEM",
content: `Connection with Node ...${oldest.peerId.slice(-8)} severed (Rotation).`,
timestamp: Date.now()
});
}
oldest.destroy();
}
}, CONNECTION_ROTATION_INTERVAL);
if (this.rotationInterval) {
clearInterval(this.rotationInterval);
}

shutdown() {
const sig = signMessage(`type:LEAVE:${this.identity.id}`, this.identity.privateKey);
const goodbye = JSON.stringify({
type: "LEAVE",
id: this.identity.id,
hops: 0,
sig,
}) + "\n";

for (const socket of this.swarm.connections) {
socket.write(goodbye);
}

if (this.heartbeatInterval) {
clearInterval(this.heartbeatInterval);
}
setTimeout(() => {
process.exit(0);
}, 500);
}

if (this.rotationInterval) {
clearInterval(this.rotationInterval);
}
getSwarm() {
return this.swarm;
}

setTimeout(() => {
process.exit(0);
}, 500);
}
broadcastChat(msg) {
if (!ENABLE_CHAT) return;

getSwarm() {
return this.swarm;
if (msg.id) {
this.messageHandler.bloomFilter.markRelayed(msg.id, "chat");
}

broadcastChat(msg) {
if (!ENABLE_CHAT) return;
const msgStr = JSON.stringify(msg) + "\n";
for (const socket of this.swarm.connections) {
socket.write(msgStr);
}
const msgStr = JSON.stringify(msg) + "\n";
for (const socket of this.swarm.connections) {
socket.write(msgStr);
}
}
}

module.exports = { SwarmManager };
6 changes: 6 additions & 0 deletions src/state/peers.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ class PeerManager {

addOrUpdatePeer(id, seq, ip = null) {
const stored = this.seenPeers.get(id);

// If we have a stored peer, only update if the new sequence is higher
if (stored && seq <= stored.seq) {
return false;
}

const wasNew = !stored;

// Track in HyperLogLog for total unique estimation
Expand Down