diff --git a/tests/integration.cpp b/tests/integration.cpp index 558856c..cf9d8d1 100644 --- a/tests/integration.cpp +++ b/tests/integration.cpp @@ -53,6 +53,51 @@ private slots: QVERIFY(nodeB.syncLibp2pStop().ok); } + void reconnectAfterDisconnect() + { + Libp2pModulePlugin nodeA; + Libp2pModulePlugin nodeB; + + QVERIFY(nodeA.syncLibp2pStart().ok); + QVERIFY(nodeB.syncLibp2pStart().ok); + + PeerInfo nodeBPeerInfo = nodeB.syncPeerInfo().data.value(); + + // connect + QVERIFY(nodeA.syncConnectPeer(nodeBPeerInfo.peerId, nodeBPeerInfo.addrs, 500).ok); + QCOMPARE(nodeA.syncConnectedPeers(Direction_Out).data.value>().size(), 1); + + // disconnect + QVERIFY(nodeA.syncDisconnectPeer(nodeBPeerInfo.peerId).ok); + QCOMPARE(nodeA.syncConnectedPeers(Direction_Out).data.value>().size(), 0); + + // reconnect + QVERIFY(nodeA.syncConnectPeer(nodeBPeerInfo.peerId, nodeBPeerInfo.addrs, 500).ok); + QCOMPARE(nodeA.syncConnectedPeers(Direction_Out).data.value>().size(), 1); + + // verify the connection works by dialing ping + const int PING_SIZE = 32; + Libp2pResult dialResult = nodeA.syncDial(nodeBPeerInfo.peerId, "/ipfs/ping/1.0.0"); + QVERIFY(dialResult.ok); + + uint64_t streamId = dialResult.data.value(); + + QByteArray payload(PING_SIZE, 0); + for (int i = 0; i < PING_SIZE; ++i) + payload[i] = static_cast(i); + QVERIFY(nodeA.syncStreamWrite(streamId, payload).ok); + + Libp2pResult readResult = nodeA.syncStreamReadExactly(streamId, PING_SIZE); + QVERIFY(readResult.ok); + QCOMPARE(readResult.data.value(), payload); + + QVERIFY(nodeA.syncStreamCloseWithEOF(streamId).ok); + QVERIFY(nodeA.syncStreamRelease(streamId).ok); + + QVERIFY(nodeA.syncLibp2pStop().ok); + QVERIFY(nodeB.syncLibp2pStop().ok); + } + void kadPutGet() { // setup node A @@ -306,6 +351,140 @@ private slots: QVERIFY(node.syncLibp2pStop().ok); } + + void gossipsubBinaryPayload() + { + Libp2pModulePlugin nodeA; + Libp2pModulePlugin nodeB; + + QVERIFY(nodeA.syncLibp2pStart().ok); + QVERIFY(nodeB.syncLibp2pStart().ok); + + PeerInfo infoA = nodeA.syncPeerInfo().data.value(); + QVERIFY(nodeB.syncConnectPeer(infoA.peerId, infoA.addrs, 500).ok); + + QString topic = "binary-topic"; + QVERIFY(nodeB.syncGossipsubSubscribe(topic).ok); + QVERIFY(nodeA.syncGossipsubSubscribe(topic).ok); + QThread::msleep(2000); + + // payload with embedded null bytes — would be truncated + // if any part of the callback chain treats data as C string + QByteArray payload; + payload.append('\x01'); + payload.append('\x00'); + payload.append('\x02'); + payload.append('\x00'); + payload.append('\x03'); + QCOMPARE(payload.size(), 5); + + QVERIFY(nodeA.syncGossipsubPublish(topic, payload).ok); + + Libp2pResult res = nodeB.syncGossipsubNextMessage(topic); + QVERIFY(res.ok); + QByteArray received = res.data.value(); + + QCOMPARE(received.size(), 5); + QCOMPARE(received, payload); + + QVERIFY(nodeA.syncLibp2pStop().ok); + QVERIFY(nodeB.syncLibp2pStop().ok); + } + + void multipleStreamsOnSameConnection() + { + const int PING_SIZE = 32; + + Libp2pModulePlugin nodeA; + Libp2pModulePlugin nodeB; + + QVERIFY(nodeA.syncLibp2pStart().ok); + QVERIFY(nodeB.syncLibp2pStart().ok); + + PeerInfo nodeBPeerInfo = nodeB.syncPeerInfo().data.value(); + QVERIFY(nodeA.syncConnectPeer(nodeBPeerInfo.peerId, nodeBPeerInfo.addrs, 500).ok); + + // open two streams on the same connection + Libp2pResult dial1 = nodeA.syncDial(nodeBPeerInfo.peerId, "/ipfs/ping/1.0.0"); + QVERIFY(dial1.ok); + uint64_t stream1 = dial1.data.value(); + + Libp2pResult dial2 = nodeA.syncDial(nodeBPeerInfo.peerId, "/ipfs/ping/1.0.0"); + QVERIFY(dial2.ok); + uint64_t stream2 = dial2.data.value(); + + QVERIFY(stream1 != stream2); + + // write different payloads to each stream + QByteArray payload1(PING_SIZE, 0); + QByteArray payload2(PING_SIZE, 0); + for (int i = 0; i < PING_SIZE; ++i) { + payload1[i] = static_cast(i); + payload2[i] = static_cast(255 - i); + } + + QVERIFY(nodeA.syncStreamWrite(stream1, payload1).ok); + QVERIFY(nodeA.syncStreamWrite(stream2, payload2).ok); + + // read back from each stream and verify no cross-contamination + Libp2pResult read1 = nodeA.syncStreamReadExactly(stream1, PING_SIZE); + QVERIFY(read1.ok); + QCOMPARE(read1.data.value(), payload1); + + Libp2pResult read2 = nodeA.syncStreamReadExactly(stream2, PING_SIZE); + QVERIFY(read2.ok); + QCOMPARE(read2.data.value(), payload2); + + // cleanup both streams + QVERIFY(nodeA.syncStreamCloseWithEOF(stream1).ok); + QVERIFY(nodeA.syncStreamRelease(stream1).ok); + QVERIFY(nodeA.syncStreamCloseWithEOF(stream2).ok); + QVERIFY(nodeA.syncStreamRelease(stream2).ok); + + QVERIFY(nodeA.syncLibp2pStop().ok); + QVERIFY(nodeB.syncLibp2pStop().ok); + } + + void directDialStreamExchange() + { + const int PING_SIZE = 32; + + Libp2pModulePlugin nodeA; + Libp2pModulePlugin nodeB; + + QVERIFY(nodeA.syncLibp2pStart().ok); + QVERIFY(nodeB.syncLibp2pStart().ok); + + PeerInfo nodeBPeerInfo = nodeB.syncPeerInfo().data.value(); + + // connect A to B + QVERIFY(nodeA.syncConnectPeer(nodeBPeerInfo.peerId, nodeBPeerInfo.addrs, 500).ok); + + // A dials B directly on /ipfs/ping/1.0.0 + Libp2pResult dialResult = nodeA.syncDial(nodeBPeerInfo.peerId, "/ipfs/ping/1.0.0"); + QVERIFY(dialResult.ok); + + uint64_t streamId = dialResult.data.value(); + QVERIFY(streamId != 0); + + // send ping payload + QByteArray payload(PING_SIZE, 0); + for (int i = 0; i < PING_SIZE; ++i) + payload[i] = static_cast(i); + QVERIFY(nodeA.syncStreamWrite(streamId, payload).ok); + + // read ping echo + Libp2pResult readResult = nodeA.syncStreamReadExactly(streamId, PING_SIZE); + QVERIFY(readResult.ok); + QCOMPARE(readResult.data.value(), payload); + + // cleanup + QVERIFY(nodeA.syncStreamCloseWithEOF(streamId).ok); + QVERIFY(nodeA.syncStreamRelease(streamId).ok); + + QVERIFY(nodeA.syncLibp2pStop().ok); + QVERIFY(nodeB.syncLibp2pStop().ok); + } }; QTEST_MAIN(TestIntegration)