Skip to content

Commit 9860a12

Browse files
authored
feat: add gossipsub (#27)
1 parent 82a87f8 commit 9860a12

File tree

13 files changed

+587
-22
lines changed

13 files changed

+587
-22
lines changed

CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ logos_module(
2626
src/kademlia.cpp
2727
src/stream.cpp
2828
src/mix.cpp
29+
src/gossipsub.cpp
2930
EXTERNAL_LIBS
3031
libp2p
3132
INCLUDE_DIRS

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ It provides:
66
- Peer connectivity
77
- Stream management
88
- Kademlia DHT operations
9+
- Mix operaions
10+
- Gossipsub operaions
911
- Sync and async APIs compatible with Qt
1012

1113
Check the examples/ directory for complete usage demonstrations.

examples/README.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,12 @@ use cases using the `logos-libp2p-module`.
2323
- Populating mix pools
2424
- Dialing through mix
2525

26+
- [Gossipsub](gossipsub.cpp) — Demonstrates usage of the [Gossipsub](https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/README.md) protocol for messaging:
27+
- Connecting to a peer
28+
- Subscribing to a topic
29+
- Publishing to that topic
30+
- Receiving the message on the subscribed topic
31+
2632
## Building
2733
Enter the development shell and configure cmake
2834

@@ -43,5 +49,6 @@ After building, run the executable from the build directory:
4349
./build/examples/example_ping
4450
./build/examples/example_kademlia
4551
./build/examples/example_mix
52+
./build/examples/example_gossipsub
4653
```
4754

examples/gossipsub.cpp

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
#include <QCoreApplication>
2+
#include <QDebug>
3+
#include <QThread>
4+
#include "plugin.h"
5+
6+
int main(int argc, char *argv[])
7+
{
8+
QCoreApplication app(argc, argv);
9+
10+
/* -----------------------------
11+
Node A setup
12+
----------------------------- */
13+
Libp2pModulePlugin nodeA;
14+
Libp2pModulePlugin nodeB;
15+
16+
qDebug() << "Starting Nodes...";
17+
if (!nodeA.syncLibp2pStart().ok) {
18+
qFatal("Node A failed to start");
19+
}
20+
if (!nodeB.syncLibp2pStart().ok) {
21+
qFatal("Node B failed to start");
22+
}
23+
24+
PeerInfo nodeAPeerInfo = nodeA.syncPeerInfo().data.value<PeerInfo>();
25+
26+
if (!nodeB.syncConnectPeer(nodeAPeerInfo.peerId, nodeAPeerInfo.addrs, 500).ok) {
27+
qFatal("Node B failed to connect to Node A");
28+
}
29+
30+
/* -----------------------------
31+
Subscribe nodes to topic
32+
----------------------------- */
33+
QString topic = "demo-topic";
34+
qDebug() << "Node B subscribing to topic:" << topic;
35+
if (!nodeB.syncGossipsubSubscribe(topic).ok) {
36+
qFatal("Node B subscription failed");
37+
}
38+
qDebug() << "Node A subscribing to topic:" << topic;
39+
if (!nodeA.syncGossipsubSubscribe(topic).ok) {
40+
qFatal("Node A subscription failed");
41+
}
42+
43+
/* -----------------------------
44+
Give the mesh time to form
45+
----------------------------- */
46+
qDebug() << "Waiting for mesh to form";
47+
QThread::msleep(2000);
48+
49+
/* -----------------------------
50+
Publish message from Node A
51+
----------------------------- */
52+
QByteArray payload = "Hello from Node A via gossipsub!";
53+
qDebug() << "Node A publishing message to topic:" << topic;
54+
if (!nodeA.syncGossipsubPublish(topic, payload).ok) {
55+
qFatal("Node A publish failed");
56+
}
57+
58+
/* -----------------------------
59+
Fetch messages for Node B
60+
----------------------------- */
61+
auto res = nodeB.syncGossipsubNextMessage(topic);
62+
if (!res.ok) {
63+
qFatal("Node B did not receive any messages");
64+
}
65+
QByteArray message = res.data.value<QByteArray>();
66+
qDebug() << "Node B received:" << message;
67+
68+
/* -----------------------------
69+
Cleanup
70+
----------------------------- */
71+
nodeA.syncLibp2pStop();
72+
nodeB.syncLibp2pStop();
73+
74+
qDebug() << "Done";
75+
76+
return 0;
77+
}

flake.lock

Lines changed: 3 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/callbacks.cpp

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,10 @@ void Libp2pModulePlugin::libp2pCallback(
6666
Qt::QueuedConnection
6767
);
6868

69-
delete callbackCtx;
69+
// topicHandler is called with context
70+
// cannot free it here if that's the case
71+
if (caller != "gossipsubSubscribe")
72+
delete callbackCtx;
7073
}
7174

7275
QList<ServiceInfo> copyServiceInfos(
@@ -471,3 +474,49 @@ void Libp2pModulePlugin::connectionCallback(
471474
delete callbackCtx;
472475
}
473476

477+
478+
void Libp2pModulePlugin::topicHandler(
479+
const char *topic,
480+
uint8_t *data,
481+
size_t len,
482+
void *userData
483+
)
484+
{
485+
auto *callbackCtx = static_cast<CallbackContext *>(userData);
486+
if (!callbackCtx) return;
487+
488+
Libp2pModulePlugin *self = callbackCtx->instance;
489+
if (!self) { delete callbackCtx; return; }
490+
491+
QString topicStr = QString::fromUtf8(topic);
492+
QByteArray payload(reinterpret_cast<const char*>(data), static_cast<int>(len));
493+
494+
// add payload to the queue for syncGossipsubNextMessage
495+
{
496+
QMutexLocker lock(&self->m_queueMutex);
497+
498+
// ensure a queue exists for this topic
499+
if (!self->m_topicQueues.contains(topicStr)) {
500+
self->m_topicQueues[topicStr] = QSharedPointer<QQueue<QByteArray>>::create();
501+
}
502+
503+
self->m_topicQueues[topicStr]->enqueue(payload);
504+
self->m_queueCond.wakeOne();
505+
}
506+
507+
QPointer<Libp2pModulePlugin> safeSelf(self);
508+
QMetaObject::invokeMethod(
509+
safeSelf,
510+
[safeSelf, topicStr, payload]() {
511+
if (!safeSelf) return;
512+
emit safeSelf->libp2pEvent(
513+
RET_OK,
514+
QString{}, // no request id (push event)
515+
QString("gossipsubMessage"),
516+
topicStr,
517+
QVariant(payload)
518+
);
519+
},
520+
Qt::QueuedConnection
521+
);
522+
}

src/gossipsub.cpp

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
#include "plugin.h"
2+
3+
#include <QtCore/QUuid>
4+
#include <QtCore/QByteArray>
5+
#include <QtCore/QDebug>
6+
#include <cstring>
7+
8+
QString Libp2pModulePlugin::gossipsubPublish(
9+
const QString &topic,
10+
const QByteArray &data
11+
)
12+
{
13+
if (!ctx)
14+
return {};
15+
16+
QString uuid = QUuid::createUuid().toString();
17+
auto *callbackCtx =
18+
new CallbackContext{ "gossipsubPublish", uuid, this };
19+
20+
QByteArray topicUtf8 = topic.toUtf8();
21+
22+
int ret = libp2p_gossipsub_publish(
23+
ctx,
24+
topicUtf8.constData(),
25+
reinterpret_cast<uint8_t*>(const_cast<char*>(data.constData())),
26+
data.size(),
27+
&Libp2pModulePlugin::libp2pCallback,
28+
callbackCtx
29+
);
30+
31+
if (ret != RET_OK) {
32+
delete callbackCtx;
33+
return {};
34+
}
35+
36+
return uuid;
37+
}
38+
39+
QString Libp2pModulePlugin::gossipsubSubscribe(
40+
const QString &topic
41+
)
42+
{
43+
if (!ctx)
44+
return {};
45+
46+
QString uuid = QUuid::createUuid().toString();
47+
auto *callbackCtx =
48+
new CallbackContext{ "gossipsubSubscribe", uuid, this };
49+
50+
QByteArray topicUtf8 = topic.toUtf8();
51+
52+
int ret = libp2p_gossipsub_subscribe(
53+
ctx,
54+
topicUtf8.constData(),
55+
&Libp2pModulePlugin::topicHandler,
56+
&Libp2pModulePlugin::libp2pCallback,
57+
callbackCtx
58+
);
59+
60+
if (ret != RET_OK) {
61+
delete callbackCtx;
62+
return {};
63+
}
64+
65+
return uuid;
66+
}
67+
68+
QString Libp2pModulePlugin::gossipsubUnsubscribe(
69+
const QString &topic
70+
)
71+
{
72+
if (!ctx)
73+
return {};
74+
75+
QString uuid = QUuid::createUuid().toString();
76+
auto *callbackCtx =
77+
new CallbackContext{ "gossipsubUnsubscribe", uuid, this };
78+
79+
QByteArray topicUtf8 = topic.toUtf8();
80+
81+
int ret = libp2p_gossipsub_unsubscribe(
82+
ctx,
83+
topicUtf8.constData(),
84+
&Libp2pModulePlugin::topicHandler,
85+
&Libp2pModulePlugin::libp2pCallback,
86+
callbackCtx
87+
);
88+
89+
if (ret != RET_OK) {
90+
delete callbackCtx;
91+
return {};
92+
}
93+
94+
return uuid;
95+
}

src/libp2p_module_interface.h

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -145,9 +145,9 @@ class Libp2pModuleInterface : public PluginInterface
145145
Q_INVOKABLE virtual QString peerInfo() = 0;
146146

147147
/// Returns currently connected peers.
148-
/// Possible Direction values: Direction_In = 0, Direction_Out = 1.
148+
/// Possible Direction values: Direction_In, Direction_Out.
149149
/// Returns a UUID string identifying this request.
150-
Q_INVOKABLE virtual QString connectedPeers(int direction = 0) = 0;
150+
Q_INVOKABLE virtual QString connectedPeers(int direction = Direction_In) = 0;
151151

152152
/// Opens a stream using a protocol.
153153
/// Returns a UUID string identifying this request.
@@ -214,6 +214,48 @@ class Libp2pModuleInterface : public PluginInterface
214214
/// data: none
215215
Q_INVOKABLE virtual Libp2pResult syncStreamRelease(uint64_t streamId) = 0;
216216

217+
/* ----------- Gossipsub ----------- */
218+
219+
/// Publish data to a topic
220+
/// Returns a UUID string identifying this request.
221+
Q_INVOKABLE virtual QString gossipsubPublish(
222+
const QString &topic,
223+
const QByteArray &data
224+
) = 0;
225+
226+
/// Subscribe to a topic
227+
/// Returns a UUID string identifying this request.
228+
Q_INVOKABLE virtual QString gossipsubSubscribe(
229+
const QString &topic
230+
) = 0;
231+
232+
/// Unsubscribe from a topic
233+
/// Returns a UUID string identifying this request.
234+
Q_INVOKABLE virtual QString gossipsubUnsubscribe(
235+
const QString &topic
236+
) = 0;
237+
238+
/* ----------- Sync Gossipsub ----------- */
239+
240+
/// data: none
241+
Q_INVOKABLE virtual Libp2pResult syncGossipsubPublish(
242+
const QString &topic,
243+
const QByteArray &data
244+
) = 0;
245+
/// data: none
246+
Q_INVOKABLE virtual Libp2pResult syncGossipsubSubscribe(
247+
const QString &topic
248+
) = 0;
249+
/// data: none
250+
Q_INVOKABLE virtual Libp2pResult syncGossipsubUnsubscribe(
251+
const QString &topic
252+
) = 0;
253+
/// data: QByteArray (message)
254+
Q_INVOKABLE virtual Libp2pResult syncGossipsubNextMessage(
255+
const QString &topic,
256+
int timeoutMs
257+
) = 0;
258+
217259
/* ----------- Kademlia ----------- */
218260

219261
/// Converts a key to a CID.

0 commit comments

Comments
 (0)