Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 179 additions & 0 deletions tests/integration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,51 @@ private slots:
QVERIFY(nodeB.syncLibp2pStop().ok);
}

void reconnectAfterDisconnect()
{
Libp2pModulePlugin nodeA;
Libp2pModulePlugin nodeB;

QVERIFY(nodeA.syncLibp2pStart().ok);
QVERIFY(nodeB.syncLibp2pStart().ok);

PeerInfo nodeBPeerInfo = nodeB.syncPeerInfo().data.value<PeerInfo>();

// connect
QVERIFY(nodeA.syncConnectPeer(nodeBPeerInfo.peerId, nodeBPeerInfo.addrs, 500).ok);
QCOMPARE(nodeA.syncConnectedPeers(Direction_Out).data.value<QList<QString>>().size(), 1);

// disconnect
QVERIFY(nodeA.syncDisconnectPeer(nodeBPeerInfo.peerId).ok);
QCOMPARE(nodeA.syncConnectedPeers(Direction_Out).data.value<QList<QString>>().size(), 0);

// reconnect
QVERIFY(nodeA.syncConnectPeer(nodeBPeerInfo.peerId, nodeBPeerInfo.addrs, 500).ok);
QCOMPARE(nodeA.syncConnectedPeers(Direction_Out).data.value<QList<QString>>().size(), 1);

// verify the connection works by dialing ping
const int PING_SIZE = 32;
Libp2pResult dialResult = nodeA.syncDial(nodeBPeerInfo.peerId, "/ipfs/ping/1.0.0");
QVERIFY(dialResult.ok);

uint64_t streamId = dialResult.data.value<uint64_t>();

QByteArray payload(PING_SIZE, 0);
for (int i = 0; i < PING_SIZE; ++i)
payload[i] = static_cast<char>(i);
QVERIFY(nodeA.syncStreamWrite(streamId, payload).ok);

Libp2pResult readResult = nodeA.syncStreamReadExactly(streamId, PING_SIZE);
QVERIFY(readResult.ok);
QCOMPARE(readResult.data.value<QByteArray>(), payload);

QVERIFY(nodeA.syncStreamCloseWithEOF(streamId).ok);
QVERIFY(nodeA.syncStreamRelease(streamId).ok);

QVERIFY(nodeA.syncLibp2pStop().ok);
QVERIFY(nodeB.syncLibp2pStop().ok);
}

void kadPutGet()
{
// setup node A
Expand Down Expand Up @@ -306,6 +351,140 @@ private slots:

QVERIFY(node.syncLibp2pStop().ok);
}

void gossipsubBinaryPayload()
{
Libp2pModulePlugin nodeA;
Libp2pModulePlugin nodeB;

QVERIFY(nodeA.syncLibp2pStart().ok);
QVERIFY(nodeB.syncLibp2pStart().ok);

PeerInfo infoA = nodeA.syncPeerInfo().data.value<PeerInfo>();
QVERIFY(nodeB.syncConnectPeer(infoA.peerId, infoA.addrs, 500).ok);

QString topic = "binary-topic";
QVERIFY(nodeB.syncGossipsubSubscribe(topic).ok);
QVERIFY(nodeA.syncGossipsubSubscribe(topic).ok);
QThread::msleep(2000);

// payload with embedded null bytes — would be truncated
// if any part of the callback chain treats data as C string
QByteArray payload;
payload.append('\x01');
payload.append('\x00');
payload.append('\x02');
payload.append('\x00');
payload.append('\x03');
QCOMPARE(payload.size(), 5);

QVERIFY(nodeA.syncGossipsubPublish(topic, payload).ok);

Libp2pResult res = nodeB.syncGossipsubNextMessage(topic);
QVERIFY(res.ok);
QByteArray received = res.data.value<QByteArray>();

QCOMPARE(received.size(), 5);
QCOMPARE(received, payload);

QVERIFY(nodeA.syncLibp2pStop().ok);
QVERIFY(nodeB.syncLibp2pStop().ok);
}

void multipleStreamsOnSameConnection()
{
const int PING_SIZE = 32;

Libp2pModulePlugin nodeA;
Libp2pModulePlugin nodeB;

QVERIFY(nodeA.syncLibp2pStart().ok);
QVERIFY(nodeB.syncLibp2pStart().ok);

PeerInfo nodeBPeerInfo = nodeB.syncPeerInfo().data.value<PeerInfo>();
QVERIFY(nodeA.syncConnectPeer(nodeBPeerInfo.peerId, nodeBPeerInfo.addrs, 500).ok);

// open two streams on the same connection
Libp2pResult dial1 = nodeA.syncDial(nodeBPeerInfo.peerId, "/ipfs/ping/1.0.0");
QVERIFY(dial1.ok);
uint64_t stream1 = dial1.data.value<uint64_t>();

Libp2pResult dial2 = nodeA.syncDial(nodeBPeerInfo.peerId, "/ipfs/ping/1.0.0");
QVERIFY(dial2.ok);
uint64_t stream2 = dial2.data.value<uint64_t>();

QVERIFY(stream1 != stream2);

// write different payloads to each stream
QByteArray payload1(PING_SIZE, 0);
QByteArray payload2(PING_SIZE, 0);
for (int i = 0; i < PING_SIZE; ++i) {
payload1[i] = static_cast<char>(i);
payload2[i] = static_cast<char>(255 - i);
}

QVERIFY(nodeA.syncStreamWrite(stream1, payload1).ok);
QVERIFY(nodeA.syncStreamWrite(stream2, payload2).ok);

// read back from each stream and verify no cross-contamination
Libp2pResult read1 = nodeA.syncStreamReadExactly(stream1, PING_SIZE);
QVERIFY(read1.ok);
QCOMPARE(read1.data.value<QByteArray>(), payload1);

Libp2pResult read2 = nodeA.syncStreamReadExactly(stream2, PING_SIZE);
QVERIFY(read2.ok);
QCOMPARE(read2.data.value<QByteArray>(), payload2);

// cleanup both streams
QVERIFY(nodeA.syncStreamCloseWithEOF(stream1).ok);
QVERIFY(nodeA.syncStreamRelease(stream1).ok);
QVERIFY(nodeA.syncStreamCloseWithEOF(stream2).ok);
QVERIFY(nodeA.syncStreamRelease(stream2).ok);

QVERIFY(nodeA.syncLibp2pStop().ok);
QVERIFY(nodeB.syncLibp2pStop().ok);
}

void directDialStreamExchange()
{
const int PING_SIZE = 32;

Libp2pModulePlugin nodeA;
Libp2pModulePlugin nodeB;

QVERIFY(nodeA.syncLibp2pStart().ok);
QVERIFY(nodeB.syncLibp2pStart().ok);

PeerInfo nodeBPeerInfo = nodeB.syncPeerInfo().data.value<PeerInfo>();

// connect A to B
QVERIFY(nodeA.syncConnectPeer(nodeBPeerInfo.peerId, nodeBPeerInfo.addrs, 500).ok);

// A dials B directly on /ipfs/ping/1.0.0
Libp2pResult dialResult = nodeA.syncDial(nodeBPeerInfo.peerId, "/ipfs/ping/1.0.0");
QVERIFY(dialResult.ok);

uint64_t streamId = dialResult.data.value<uint64_t>();
QVERIFY(streamId != 0);

// send ping payload
QByteArray payload(PING_SIZE, 0);
for (int i = 0; i < PING_SIZE; ++i)
payload[i] = static_cast<char>(i);
QVERIFY(nodeA.syncStreamWrite(streamId, payload).ok);

// read ping echo
Libp2pResult readResult = nodeA.syncStreamReadExactly(streamId, PING_SIZE);
QVERIFY(readResult.ok);
QCOMPARE(readResult.data.value<QByteArray>(), payload);

// cleanup
QVERIFY(nodeA.syncStreamCloseWithEOF(streamId).ok);
QVERIFY(nodeA.syncStreamRelease(streamId).ok);

QVERIFY(nodeA.syncLibp2pStop().ok);
QVERIFY(nodeB.syncLibp2pStop().ok);
}
};

QTEST_MAIN(TestIntegration)
Expand Down
Loading