From e9a1e44cdb238bc63be30fdca184008c14765cfb Mon Sep 17 00:00:00 2001 From: Radoslaw Kaminski Date: Tue, 10 Mar 2026 14:42:55 +0000 Subject: [PATCH 1/7] add test --- tests/integration.cpp | 41 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/tests/integration.cpp b/tests/integration.cpp index 558856c..86bacb0 100644 --- a/tests/integration.cpp +++ b/tests/integration.cpp @@ -306,6 +306,47 @@ private slots: QVERIFY(node.syncLibp2pStop().ok); } + + void directDialStreamExchange() + { + const int PING_SIZE = 32; + + Libp2pModulePlugin nodeA; + Libp2pModulePlugin nodeB; + + QVERIFY(nodeA.syncLibp2pStart().ok); + QVERIFY(nodeB.syncLibp2pStart().ok); + + PeerInfo infoBPeerInfo = nodeB.syncPeerInfo().data.value(); + + // connect A to B + QVERIFY(nodeA.syncConnectPeer(infoBPeerInfo.peerId, infoBPeerInfo.addrs, 500).ok); + + // A dials B directly on /ipfs/ping/1.0.0 + Libp2pResult dialResult = nodeA.syncDial(infoBPeerInfo.peerId, "/ipfs/ping/1.0.0"); + QVERIFY(dialResult.ok); + + uint64_t streamId = dialResult.data.value(); + QVERIFY(streamId != 0); + + // send ping payload + QByteArray payload(PING_SIZE, 0); + for (int i = 0; i < PING_SIZE; ++i) + payload[i] = static_cast(i); + QVERIFY(nodeA.syncStreamWrite(streamId, payload).ok); + + // read ping echo + Libp2pResult readResult = nodeA.syncStreamReadExactly(streamId, PING_SIZE); + QVERIFY(readResult.ok); + QCOMPARE(readResult.data.value(), payload); + + // cleanup + QVERIFY(nodeA.syncStreamCloseWithEOF(streamId).ok); + QVERIFY(nodeA.syncStreamRelease(streamId).ok); + + QVERIFY(nodeA.syncLibp2pStop().ok); + QVERIFY(nodeB.syncLibp2pStop().ok); + } }; QTEST_MAIN(TestIntegration) From ca393fca5df00d8fce6148c4a38b0414c819b7ee Mon Sep 17 00:00:00 2001 From: Radoslaw Kaminski Date: Tue, 10 Mar 2026 14:49:34 +0000 Subject: [PATCH 2/7] add test --- tests/integration.cpp | 45 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/tests/integration.cpp b/tests/integration.cpp index 86bacb0..12c267c 100644 --- a/tests/integration.cpp +++ b/tests/integration.cpp @@ -53,6 +53,51 @@ private slots: QVERIFY(nodeB.syncLibp2pStop().ok); } + void reconnectAfterDisconnect() + { + Libp2pModulePlugin nodeA; + Libp2pModulePlugin nodeB; + + QVERIFY(nodeA.syncLibp2pStart().ok); + QVERIFY(nodeB.syncLibp2pStart().ok); + + PeerInfo infoBPeerInfo = nodeB.syncPeerInfo().data.value(); + + // connect + QVERIFY(nodeA.syncConnectPeer(infoBPeerInfo.peerId, infoBPeerInfo.addrs, 500).ok); + QCOMPARE(nodeA.syncConnectedPeers(Direction_Out).data.value>().size(), 1); + + // disconnect + QVERIFY(nodeA.syncDisconnectPeer(infoBPeerInfo.peerId).ok); + QCOMPARE(nodeA.syncConnectedPeers(Direction_Out).data.value>().size(), 0); + + // reconnect + QVERIFY(nodeA.syncConnectPeer(infoBPeerInfo.peerId, infoBPeerInfo.addrs, 500).ok); + QCOMPARE(nodeA.syncConnectedPeers(Direction_Out).data.value>().size(), 1); + + // verify the connection works by dialing ping + const int PING_SIZE = 32; + Libp2pResult dialResult = nodeA.syncDial(infoBPeerInfo.peerId, "/ipfs/ping/1.0.0"); + QVERIFY(dialResult.ok); + + uint64_t streamId = dialResult.data.value(); + + QByteArray payload(PING_SIZE, 0); + for (int i = 0; i < PING_SIZE; ++i) + payload[i] = static_cast(i); + QVERIFY(nodeA.syncStreamWrite(streamId, payload).ok); + + Libp2pResult readResult = nodeA.syncStreamReadExactly(streamId, PING_SIZE); + QVERIFY(readResult.ok); + QCOMPARE(readResult.data.value(), payload); + + QVERIFY(nodeA.syncStreamCloseWithEOF(streamId).ok); + QVERIFY(nodeA.syncStreamRelease(streamId).ok); + + QVERIFY(nodeA.syncLibp2pStop().ok); + QVERIFY(nodeB.syncLibp2pStop().ok); + } + void kadPutGet() { // setup node A From ea0fb1d27864dd2eb633d673a24e10c7a6fd97f2 Mon Sep 17 00:00:00 2001 From: Radoslaw Kaminski Date: Tue, 10 Mar 2026 14:56:47 +0000 Subject: [PATCH 3/7] add test --- tests/integration.cpp | 54 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/tests/integration.cpp b/tests/integration.cpp index 12c267c..f2a04f6 100644 --- a/tests/integration.cpp +++ b/tests/integration.cpp @@ -352,6 +352,60 @@ private slots: QVERIFY(node.syncLibp2pStop().ok); } + void multipleStreamsOnSameConnection() + { + const int PING_SIZE = 32; + + Libp2pModulePlugin nodeA; + Libp2pModulePlugin nodeB; + + QVERIFY(nodeA.syncLibp2pStart().ok); + QVERIFY(nodeB.syncLibp2pStart().ok); + + PeerInfo infoBPeerInfo = nodeB.syncPeerInfo().data.value(); + QVERIFY(nodeA.syncConnectPeer(infoBPeerInfo.peerId, infoBPeerInfo.addrs, 500).ok); + + // open two streams on the same connection + Libp2pResult dial1 = nodeA.syncDial(infoBPeerInfo.peerId, "/ipfs/ping/1.0.0"); + QVERIFY(dial1.ok); + uint64_t stream1 = dial1.data.value(); + + Libp2pResult dial2 = nodeA.syncDial(infoBPeerInfo.peerId, "/ipfs/ping/1.0.0"); + QVERIFY(dial2.ok); + uint64_t stream2 = dial2.data.value(); + + QVERIFY(stream1 != stream2); + + // write different payloads to each stream + QByteArray payload1(PING_SIZE, 0); + QByteArray payload2(PING_SIZE, 0); + for (int i = 0; i < PING_SIZE; ++i) { + payload1[i] = static_cast(i); + payload2[i] = static_cast(255 - i); + } + + QVERIFY(nodeA.syncStreamWrite(stream1, payload1).ok); + QVERIFY(nodeA.syncStreamWrite(stream2, payload2).ok); + + // read back from each stream and verify no cross-contamination + Libp2pResult read1 = nodeA.syncStreamReadExactly(stream1, PING_SIZE); + QVERIFY(read1.ok); + QCOMPARE(read1.data.value(), payload1); + + Libp2pResult read2 = nodeA.syncStreamReadExactly(stream2, PING_SIZE); + QVERIFY(read2.ok); + QCOMPARE(read2.data.value(), payload2); + + // cleanup both streams + QVERIFY(nodeA.syncStreamCloseWithEOF(stream1).ok); + QVERIFY(nodeA.syncStreamRelease(stream1).ok); + QVERIFY(nodeA.syncStreamCloseWithEOF(stream2).ok); + QVERIFY(nodeA.syncStreamRelease(stream2).ok); + + QVERIFY(nodeA.syncLibp2pStop().ok); + QVERIFY(nodeB.syncLibp2pStop().ok); + } + void directDialStreamExchange() { const int PING_SIZE = 32; From 53e2c4c9e5bb4da8176a612ff238a1306cf07478 Mon Sep 17 00:00:00 2001 From: Radoslaw Kaminski Date: Tue, 10 Mar 2026 14:59:42 +0000 Subject: [PATCH 4/7] add test --- tests/integration.cpp | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/tests/integration.cpp b/tests/integration.cpp index f2a04f6..3c94f04 100644 --- a/tests/integration.cpp +++ b/tests/integration.cpp @@ -352,6 +352,42 @@ private slots: QVERIFY(node.syncLibp2pStop().ok); } + void gossipsubBinaryPayload() + { + Libp2pModulePlugin nodeA; + Libp2pModulePlugin nodeB; + + QVERIFY(nodeA.syncLibp2pStart().ok); + QVERIFY(nodeB.syncLibp2pStart().ok); + + PeerInfo infoA = nodeA.syncPeerInfo().data.value(); + QVERIFY(nodeB.syncConnectPeer(infoA.peerId, infoA.addrs, 500).ok); + + QString topic = "binary-topic"; + QVERIFY(nodeB.syncGossipsubSubscribe(topic).ok); + QVERIFY(nodeA.syncGossipsubSubscribe(topic).ok); + QThread::msleep(2000); + + // payload with embedded null bytes — would be truncated + // if any part of the callback chain treats data as C string + QByteArray payload; + payload.append('\x01'); + payload.append('\x00'); + payload.append('\x02'); + payload.append('\x00'); + payload.append('\x03'); + QCOMPARE(payload.size(), 5); + + QVERIFY(nodeA.syncGossipsubPublish(topic, payload).ok); + + QByteArray received = nodeB.syncGossipsubNextMessage(topic).data.value(); + QCOMPARE(received.size(), 5); + QCOMPARE(received, payload); + + QVERIFY(nodeA.syncLibp2pStop().ok); + QVERIFY(nodeB.syncLibp2pStop().ok); + } + void multipleStreamsOnSameConnection() { const int PING_SIZE = 32; From b87b1ec209c00b433fbd158c0ecc70808519df6a Mon Sep 17 00:00:00 2001 From: Radoslaw Kaminski Date: Tue, 10 Mar 2026 15:04:31 +0000 Subject: [PATCH 5/7] add test --- tests/integration.cpp | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/tests/integration.cpp b/tests/integration.cpp index 3c94f04..0af3af7 100644 --- a/tests/integration.cpp +++ b/tests/integration.cpp @@ -132,6 +132,42 @@ private slots: QVERIFY(nodeB.syncLibp2pStop().ok); } + void kadFindNode() + { + // node A is the bootstrap node + Libp2pModulePlugin nodeA; + QVERIFY(nodeA.syncLibp2pStart().ok); + PeerInfo infoA = nodeA.syncPeerInfo().data.value(); + + // nodes B, C, D all bootstrap from A to form a DHT + Libp2pModulePlugin nodeB(Libp2pModuleOptions{ .bootstrapNodes = { infoA } }); + QVERIFY(nodeB.syncLibp2pStart().ok); + PeerInfo infoB = nodeB.syncPeerInfo().data.value(); + + Libp2pModulePlugin nodeC(Libp2pModuleOptions{ .bootstrapNodes = { infoA } }); + QVERIFY(nodeC.syncLibp2pStart().ok); + PeerInfo infoC = nodeC.syncPeerInfo().data.value(); + + Libp2pModulePlugin nodeD(Libp2pModuleOptions{ .bootstrapNodes = { infoA } }); + QVERIFY(nodeD.syncLibp2pStart().ok); + PeerInfo infoD = nodeD.syncPeerInfo().data.value(); + + // D looks up B's peer ID — must discover it through the DHT + Libp2pResult result = nodeD.syncKadFindNode(infoB.peerId); + QVERIFY(result.ok); + + QList peers = result.data.value>(); + QCOMPARE(peers.size(), 3); + QVERIFY(peers.contains(infoA.peerId)); + QVERIFY(peers.contains(infoB.peerId)); + QVERIFY(peers.contains(infoC.peerId)); + + QVERIFY(nodeA.syncLibp2pStop().ok); + QVERIFY(nodeB.syncLibp2pStop().ok); + QVERIFY(nodeC.syncLibp2pStop().ok); + QVERIFY(nodeD.syncLibp2pStop().ok); + } + void mixDialAndReply() { const int NUM_NODES = 5; From cff1673885e8e874eb6d70ffe9a82106ef0c89ad Mon Sep 17 00:00:00 2001 From: Radoslaw Kaminski Date: Tue, 10 Mar 2026 15:25:03 +0000 Subject: [PATCH 6/7] copilot code review --- tests/integration.cpp | 30 ++++++++++++++++-------------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/tests/integration.cpp b/tests/integration.cpp index 0af3af7..d01c83e 100644 --- a/tests/integration.cpp +++ b/tests/integration.cpp @@ -61,23 +61,23 @@ private slots: QVERIFY(nodeA.syncLibp2pStart().ok); QVERIFY(nodeB.syncLibp2pStart().ok); - PeerInfo infoBPeerInfo = nodeB.syncPeerInfo().data.value(); + PeerInfo nodeBPeerInfo = nodeB.syncPeerInfo().data.value(); // connect - QVERIFY(nodeA.syncConnectPeer(infoBPeerInfo.peerId, infoBPeerInfo.addrs, 500).ok); + QVERIFY(nodeA.syncConnectPeer(nodeBPeerInfo.peerId, nodeBPeerInfo.addrs, 500).ok); QCOMPARE(nodeA.syncConnectedPeers(Direction_Out).data.value>().size(), 1); // disconnect - QVERIFY(nodeA.syncDisconnectPeer(infoBPeerInfo.peerId).ok); + QVERIFY(nodeA.syncDisconnectPeer(nodeBPeerInfo.peerId).ok); QCOMPARE(nodeA.syncConnectedPeers(Direction_Out).data.value>().size(), 0); // reconnect - QVERIFY(nodeA.syncConnectPeer(infoBPeerInfo.peerId, infoBPeerInfo.addrs, 500).ok); + QVERIFY(nodeA.syncConnectPeer(nodeBPeerInfo.peerId, nodeBPeerInfo.addrs, 500).ok); QCOMPARE(nodeA.syncConnectedPeers(Direction_Out).data.value>().size(), 1); // verify the connection works by dialing ping const int PING_SIZE = 32; - Libp2pResult dialResult = nodeA.syncDial(infoBPeerInfo.peerId, "/ipfs/ping/1.0.0"); + Libp2pResult dialResult = nodeA.syncDial(nodeBPeerInfo.peerId, "/ipfs/ping/1.0.0"); QVERIFY(dialResult.ok); uint64_t streamId = dialResult.data.value(); @@ -150,7 +150,6 @@ private slots: Libp2pModulePlugin nodeD(Libp2pModuleOptions{ .bootstrapNodes = { infoA } }); QVERIFY(nodeD.syncLibp2pStart().ok); - PeerInfo infoD = nodeD.syncPeerInfo().data.value(); // D looks up B's peer ID — must discover it through the DHT Libp2pResult result = nodeD.syncKadFindNode(infoB.peerId); @@ -416,7 +415,10 @@ private slots: QVERIFY(nodeA.syncGossipsubPublish(topic, payload).ok); - QByteArray received = nodeB.syncGossipsubNextMessage(topic).data.value(); + Libp2pResult res = nodeB.syncGossipsubNextMessage(topic); + QVERIFY(res.ok); + QByteArray received = res.data.value(); + QCOMPARE(received.size(), 5); QCOMPARE(received, payload); @@ -434,15 +436,15 @@ private slots: QVERIFY(nodeA.syncLibp2pStart().ok); QVERIFY(nodeB.syncLibp2pStart().ok); - PeerInfo infoBPeerInfo = nodeB.syncPeerInfo().data.value(); - QVERIFY(nodeA.syncConnectPeer(infoBPeerInfo.peerId, infoBPeerInfo.addrs, 500).ok); + PeerInfo nodeBPeerInfo = nodeB.syncPeerInfo().data.value(); + QVERIFY(nodeA.syncConnectPeer(nodeBPeerInfo.peerId, nodeBPeerInfo.addrs, 500).ok); // open two streams on the same connection - Libp2pResult dial1 = nodeA.syncDial(infoBPeerInfo.peerId, "/ipfs/ping/1.0.0"); + Libp2pResult dial1 = nodeA.syncDial(nodeBPeerInfo.peerId, "/ipfs/ping/1.0.0"); QVERIFY(dial1.ok); uint64_t stream1 = dial1.data.value(); - Libp2pResult dial2 = nodeA.syncDial(infoBPeerInfo.peerId, "/ipfs/ping/1.0.0"); + Libp2pResult dial2 = nodeA.syncDial(nodeBPeerInfo.peerId, "/ipfs/ping/1.0.0"); QVERIFY(dial2.ok); uint64_t stream2 = dial2.data.value(); @@ -488,13 +490,13 @@ private slots: QVERIFY(nodeA.syncLibp2pStart().ok); QVERIFY(nodeB.syncLibp2pStart().ok); - PeerInfo infoBPeerInfo = nodeB.syncPeerInfo().data.value(); + PeerInfo nodeBPeerInfo = nodeB.syncPeerInfo().data.value(); // connect A to B - QVERIFY(nodeA.syncConnectPeer(infoBPeerInfo.peerId, infoBPeerInfo.addrs, 500).ok); + QVERIFY(nodeA.syncConnectPeer(nodeBPeerInfo.peerId, nodeBPeerInfo.addrs, 500).ok); // A dials B directly on /ipfs/ping/1.0.0 - Libp2pResult dialResult = nodeA.syncDial(infoBPeerInfo.peerId, "/ipfs/ping/1.0.0"); + Libp2pResult dialResult = nodeA.syncDial(nodeBPeerInfo.peerId, "/ipfs/ping/1.0.0"); QVERIFY(dialResult.ok); uint64_t streamId = dialResult.data.value(); From ccc38a48c78ddeb46a8acba6f52eeb0ad88de859 Mon Sep 17 00:00:00 2001 From: Radoslaw Kaminski Date: Wed, 11 Mar 2026 17:02:41 +0000 Subject: [PATCH 7/7] Revert "add test" This reverts commit b87b1ec209c00b433fbd158c0ecc70808519df6a. --- tests/integration.cpp | 35 ----------------------------------- 1 file changed, 35 deletions(-) diff --git a/tests/integration.cpp b/tests/integration.cpp index d01c83e..cf9d8d1 100644 --- a/tests/integration.cpp +++ b/tests/integration.cpp @@ -132,41 +132,6 @@ private slots: QVERIFY(nodeB.syncLibp2pStop().ok); } - void kadFindNode() - { - // node A is the bootstrap node - Libp2pModulePlugin nodeA; - QVERIFY(nodeA.syncLibp2pStart().ok); - PeerInfo infoA = nodeA.syncPeerInfo().data.value(); - - // nodes B, C, D all bootstrap from A to form a DHT - Libp2pModulePlugin nodeB(Libp2pModuleOptions{ .bootstrapNodes = { infoA } }); - QVERIFY(nodeB.syncLibp2pStart().ok); - PeerInfo infoB = nodeB.syncPeerInfo().data.value(); - - Libp2pModulePlugin nodeC(Libp2pModuleOptions{ .bootstrapNodes = { infoA } }); - QVERIFY(nodeC.syncLibp2pStart().ok); - PeerInfo infoC = nodeC.syncPeerInfo().data.value(); - - Libp2pModulePlugin nodeD(Libp2pModuleOptions{ .bootstrapNodes = { infoA } }); - QVERIFY(nodeD.syncLibp2pStart().ok); - - // D looks up B's peer ID — must discover it through the DHT - Libp2pResult result = nodeD.syncKadFindNode(infoB.peerId); - QVERIFY(result.ok); - - QList peers = result.data.value>(); - QCOMPARE(peers.size(), 3); - QVERIFY(peers.contains(infoA.peerId)); - QVERIFY(peers.contains(infoB.peerId)); - QVERIFY(peers.contains(infoC.peerId)); - - QVERIFY(nodeA.syncLibp2pStop().ok); - QVERIFY(nodeB.syncLibp2pStop().ok); - QVERIFY(nodeC.syncLibp2pStop().ok); - QVERIFY(nodeD.syncLibp2pStop().ok); - } - void mixDialAndReply() { const int NUM_NODES = 5;